Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ a2d2e1a7

History | View | Annotate | Download (193.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = self.cfg.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  @type static: L{utils.FieldSet}
377
  @param static: static fields set
378
  @type dynamic: L{utils.FieldSet}
379
  @param dynamic: dynamic fields set
380

381
  """
382
  f = utils.FieldSet()
383
  f.Extend(static)
384
  f.Extend(dynamic)
385

    
386
  delta = f.NonMatching(selected)
387
  if delta:
388
    raise errors.OpPrereqError("Unknown output fields selected: %s"
389
                               % ",".join(delta))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
434
  args = {
435
    'name': instance.name,
436
    'primary_node': instance.primary_node,
437
    'secondary_nodes': instance.secondary_nodes,
438
    'os_type': instance.os,
439
    'status': instance.os,
440
    'memory': bep[constants.BE_MEMORY],
441
    'vcpus': bep[constants.BE_VCPUS],
442
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
443
  }
444
  if override:
445
    args.update(override)
446
  return _BuildInstanceHookEnv(**args)
447

    
448

    
449
def _CheckInstanceBridgesExist(lu, instance):
450
  """Check that the brigdes needed by an instance exist.
451

452
  """
453
  # check bridges existance
454
  brlist = [nic.bridge for nic in instance.nics]
455
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
456
    raise errors.OpPrereqError("one or more target bridges %s does not"
457
                               " exist on destination node '%s'" %
458
                               (brlist, instance.primary_node))
459

    
460

    
461
class LUDestroyCluster(NoHooksLU):
462
  """Logical unit for destroying the cluster.
463

464
  """
465
  _OP_REQP = []
466

    
467
  def CheckPrereq(self):
468
    """Check prerequisites.
469

470
    This checks whether the cluster is empty.
471

472
    Any errors are signalled by raising errors.OpPrereqError.
473

474
    """
475
    master = self.cfg.GetMasterNode()
476

    
477
    nodelist = self.cfg.GetNodeList()
478
    if len(nodelist) != 1 or nodelist[0] != master:
479
      raise errors.OpPrereqError("There are still %d node(s) in"
480
                                 " this cluster." % (len(nodelist) - 1))
481
    instancelist = self.cfg.GetInstanceList()
482
    if instancelist:
483
      raise errors.OpPrereqError("There are still %d instance(s) in"
484
                                 " this cluster." % len(instancelist))
485

    
486
  def Exec(self, feedback_fn):
487
    """Destroys the cluster.
488

489
    """
490
    master = self.cfg.GetMasterNode()
491
    if not self.rpc.call_node_stop_master(master, False):
492
      raise errors.OpExecError("Could not disable the master role")
493
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
494
    utils.CreateBackup(priv_key)
495
    utils.CreateBackup(pub_key)
496
    return master
497

    
498

    
499
class LUVerifyCluster(LogicalUnit):
500
  """Verifies the cluster status.
501

502
  """
503
  HPATH = "cluster-verify"
504
  HTYPE = constants.HTYPE_CLUSTER
505
  _OP_REQP = ["skip_checks"]
506
  REQ_BGL = False
507

    
508
  def ExpandNames(self):
509
    self.needed_locks = {
510
      locking.LEVEL_NODE: locking.ALL_SET,
511
      locking.LEVEL_INSTANCE: locking.ALL_SET,
512
    }
513
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
514

    
515
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
516
                  remote_version, feedback_fn):
517
    """Run multiple tests against a node.
518

519
    Test list:
520
      - compares ganeti version
521
      - checks vg existance and size > 20G
522
      - checks config file checksum
523
      - checks ssh to other nodes
524

525
    Args:
526
      node: name of the node to check
527
      file_list: required list of files
528
      local_cksum: dictionary of local files and their checksums
529

530
    """
531
    # compares ganeti version
532
    local_version = constants.PROTOCOL_VERSION
533
    if not remote_version:
534
      feedback_fn("  - ERROR: connection to %s failed" % (node))
535
      return True
536

    
537
    if local_version != remote_version:
538
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
539
                      (local_version, node, remote_version))
540
      return True
541

    
542
    # checks vg existance and size > 20G
543

    
544
    bad = False
545
    if not vglist:
546
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
547
                      (node,))
548
      bad = True
549
    else:
550
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
551
                                            constants.MIN_VG_SIZE)
552
      if vgstatus:
553
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
554
        bad = True
555

    
556
    if not node_result:
557
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
558
      return True
559

    
560
    # checks config file checksum
561
    # checks ssh to any
562

    
563
    if 'filelist' not in node_result:
564
      bad = True
565
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
566
    else:
567
      remote_cksum = node_result['filelist']
568
      for file_name in file_list:
569
        if file_name not in remote_cksum:
570
          bad = True
571
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
572
        elif remote_cksum[file_name] != local_cksum[file_name]:
573
          bad = True
574
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
575

    
576
    if 'nodelist' not in node_result:
577
      bad = True
578
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
579
    else:
580
      if node_result['nodelist']:
581
        bad = True
582
        for node in node_result['nodelist']:
583
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
584
                          (node, node_result['nodelist'][node]))
585
    if 'node-net-test' not in node_result:
586
      bad = True
587
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
588
    else:
589
      if node_result['node-net-test']:
590
        bad = True
591
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
592
        for node in nlist:
593
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
594
                          (node, node_result['node-net-test'][node]))
595

    
596
    hyp_result = node_result.get('hypervisor', None)
597
    if isinstance(hyp_result, dict):
598
      for hv_name, hv_result in hyp_result.iteritems():
599
        if hv_result is not None:
600
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
601
                      (hv_name, hv_result))
602
    return bad
603

    
604
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
605
                      node_instance, feedback_fn):
606
    """Verify an instance.
607

608
    This function checks to see if the required block devices are
609
    available on the instance's node.
610

611
    """
612
    bad = False
613

    
614
    node_current = instanceconfig.primary_node
615

    
616
    node_vol_should = {}
617
    instanceconfig.MapLVsByNode(node_vol_should)
618

    
619
    for node in node_vol_should:
620
      for volume in node_vol_should[node]:
621
        if node not in node_vol_is or volume not in node_vol_is[node]:
622
          feedback_fn("  - ERROR: volume %s missing on node %s" %
623
                          (volume, node))
624
          bad = True
625

    
626
    if not instanceconfig.status == 'down':
627
      if (node_current not in node_instance or
628
          not instance in node_instance[node_current]):
629
        feedback_fn("  - ERROR: instance %s not running on node %s" %
630
                        (instance, node_current))
631
        bad = True
632

    
633
    for node in node_instance:
634
      if (not node == node_current):
635
        if instance in node_instance[node]:
636
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
637
                          (instance, node))
638
          bad = True
639

    
640
    return bad
641

    
642
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
643
    """Verify if there are any unknown volumes in the cluster.
644

645
    The .os, .swap and backup volumes are ignored. All other volumes are
646
    reported as unknown.
647

648
    """
649
    bad = False
650

    
651
    for node in node_vol_is:
652
      for volume in node_vol_is[node]:
653
        if node not in node_vol_should or volume not in node_vol_should[node]:
654
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
655
                      (volume, node))
656
          bad = True
657
    return bad
658

    
659
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
660
    """Verify the list of running instances.
661

662
    This checks what instances are running but unknown to the cluster.
663

664
    """
665
    bad = False
666
    for node in node_instance:
667
      for runninginstance in node_instance[node]:
668
        if runninginstance not in instancelist:
669
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
670
                          (runninginstance, node))
671
          bad = True
672
    return bad
673

    
674
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
675
    """Verify N+1 Memory Resilience.
676

677
    Check that if one single node dies we can still start all the instances it
678
    was primary for.
679

680
    """
681
    bad = False
682

    
683
    for node, nodeinfo in node_info.iteritems():
684
      # This code checks that every node which is now listed as secondary has
685
      # enough memory to host all instances it is supposed to should a single
686
      # other node in the cluster fail.
687
      # FIXME: not ready for failover to an arbitrary node
688
      # FIXME: does not support file-backed instances
689
      # WARNING: we currently take into account down instances as well as up
690
      # ones, considering that even if they're down someone might want to start
691
      # them even in the event of a node failure.
692
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
693
        needed_mem = 0
694
        for instance in instances:
695
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
696
          if bep[constants.BE_AUTO_BALANCE]:
697
            needed_mem += bep[constants.BE_MEMORY]
698
        if nodeinfo['mfree'] < needed_mem:
699
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
700
                      " failovers should node %s fail" % (node, prinode))
701
          bad = True
702
    return bad
703

    
704
  def CheckPrereq(self):
705
    """Check prerequisites.
706

707
    Transform the list of checks we're going to skip into a set and check that
708
    all its members are valid.
709

710
    """
711
    self.skip_set = frozenset(self.op.skip_checks)
712
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
713
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
714

    
715
  def BuildHooksEnv(self):
716
    """Build hooks env.
717

718
    Cluster-Verify hooks just rone in the post phase and their failure makes
719
    the output be logged in the verify output and the verification to fail.
720

721
    """
722
    all_nodes = self.cfg.GetNodeList()
723
    # TODO: populate the environment with useful information for verify hooks
724
    env = {}
725
    return env, [], all_nodes
726

    
727
  def Exec(self, feedback_fn):
728
    """Verify integrity of cluster, performing various test on nodes.
729

730
    """
731
    bad = False
732
    feedback_fn("* Verifying global settings")
733
    for msg in self.cfg.VerifyConfig():
734
      feedback_fn("  - ERROR: %s" % msg)
735

    
736
    vg_name = self.cfg.GetVGName()
737
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
738
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
739
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
740
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
741
    i_non_redundant = [] # Non redundant instances
742
    i_non_a_balanced = [] # Non auto-balanced instances
743
    node_volume = {}
744
    node_instance = {}
745
    node_info = {}
746
    instance_cfg = {}
747

    
748
    # FIXME: verify OS list
749
    # do local checksums
750
    file_names = []
751
    file_names.append(constants.SSL_CERT_FILE)
752
    file_names.append(constants.CLUSTER_CONF_FILE)
753
    local_checksums = utils.FingerprintFiles(file_names)
754

    
755
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
756
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
757
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
758
    all_vglist = self.rpc.call_vg_list(nodelist)
759
    node_verify_param = {
760
      'filelist': file_names,
761
      'nodelist': nodelist,
762
      'hypervisor': hypervisors,
763
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
764
                        for node in nodeinfo]
765
      }
766
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
767
                                           self.cfg.GetClusterName())
768
    all_rversion = self.rpc.call_version(nodelist)
769
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
770
                                        self.cfg.GetHypervisorType())
771

    
772
    cluster = self.cfg.GetClusterInfo()
773
    for node in nodelist:
774
      feedback_fn("* Verifying node %s" % node)
775
      result = self._VerifyNode(node, file_names, local_checksums,
776
                                all_vglist[node], all_nvinfo[node],
777
                                all_rversion[node], feedback_fn)
778
      bad = bad or result
779

    
780
      # node_volume
781
      volumeinfo = all_volumeinfo[node]
782

    
783
      if isinstance(volumeinfo, basestring):
784
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
785
                    (node, volumeinfo[-400:].encode('string_escape')))
786
        bad = True
787
        node_volume[node] = {}
788
      elif not isinstance(volumeinfo, dict):
789
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
790
        bad = True
791
        continue
792
      else:
793
        node_volume[node] = volumeinfo
794

    
795
      # node_instance
796
      nodeinstance = all_instanceinfo[node]
797
      if type(nodeinstance) != list:
798
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
799
        bad = True
800
        continue
801

    
802
      node_instance[node] = nodeinstance
803

    
804
      # node_info
805
      nodeinfo = all_ninfo[node]
806
      if not isinstance(nodeinfo, dict):
807
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
808
        bad = True
809
        continue
810

    
811
      try:
812
        node_info[node] = {
813
          "mfree": int(nodeinfo['memory_free']),
814
          "dfree": int(nodeinfo['vg_free']),
815
          "pinst": [],
816
          "sinst": [],
817
          # dictionary holding all instances this node is secondary for,
818
          # grouped by their primary node. Each key is a cluster node, and each
819
          # value is a list of instances which have the key as primary and the
820
          # current node as secondary.  this is handy to calculate N+1 memory
821
          # availability if you can only failover from a primary to its
822
          # secondary.
823
          "sinst-by-pnode": {},
824
        }
825
      except ValueError:
826
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
827
        bad = True
828
        continue
829

    
830
    node_vol_should = {}
831

    
832
    for instance in instancelist:
833
      feedback_fn("* Verifying instance %s" % instance)
834
      inst_config = self.cfg.GetInstanceInfo(instance)
835
      result =  self._VerifyInstance(instance, inst_config, node_volume,
836
                                     node_instance, feedback_fn)
837
      bad = bad or result
838

    
839
      inst_config.MapLVsByNode(node_vol_should)
840

    
841
      instance_cfg[instance] = inst_config
842

    
843
      pnode = inst_config.primary_node
844
      if pnode in node_info:
845
        node_info[pnode]['pinst'].append(instance)
846
      else:
847
        feedback_fn("  - ERROR: instance %s, connection to primary node"
848
                    " %s failed" % (instance, pnode))
849
        bad = True
850

    
851
      # If the instance is non-redundant we cannot survive losing its primary
852
      # node, so we are not N+1 compliant. On the other hand we have no disk
853
      # templates with more than one secondary so that situation is not well
854
      # supported either.
855
      # FIXME: does not support file-backed instances
856
      if len(inst_config.secondary_nodes) == 0:
857
        i_non_redundant.append(instance)
858
      elif len(inst_config.secondary_nodes) > 1:
859
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
860
                    % instance)
861

    
862
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
863
        i_non_a_balanced.append(instance)
864

    
865
      for snode in inst_config.secondary_nodes:
866
        if snode in node_info:
867
          node_info[snode]['sinst'].append(instance)
868
          if pnode not in node_info[snode]['sinst-by-pnode']:
869
            node_info[snode]['sinst-by-pnode'][pnode] = []
870
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
871
        else:
872
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
873
                      " %s failed" % (instance, snode))
874

    
875
    feedback_fn("* Verifying orphan volumes")
876
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
877
                                       feedback_fn)
878
    bad = bad or result
879

    
880
    feedback_fn("* Verifying remaining instances")
881
    result = self._VerifyOrphanInstances(instancelist, node_instance,
882
                                         feedback_fn)
883
    bad = bad or result
884

    
885
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
886
      feedback_fn("* Verifying N+1 Memory redundancy")
887
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
888
      bad = bad or result
889

    
890
    feedback_fn("* Other Notes")
891
    if i_non_redundant:
892
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
893
                  % len(i_non_redundant))
894

    
895
    if i_non_a_balanced:
896
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
897
                  % len(i_non_a_balanced))
898

    
899
    return not bad
900

    
901
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
902
    """Analize the post-hooks' result, handle it, and send some
903
    nicely-formatted feedback back to the user.
904

905
    Args:
906
      phase: the hooks phase that has just been run
907
      hooks_results: the results of the multi-node hooks rpc call
908
      feedback_fn: function to send feedback back to the caller
909
      lu_result: previous Exec result
910

911
    """
912
    # We only really run POST phase hooks, and are only interested in
913
    # their results
914
    if phase == constants.HOOKS_PHASE_POST:
915
      # Used to change hooks' output to proper indentation
916
      indent_re = re.compile('^', re.M)
917
      feedback_fn("* Hooks Results")
918
      if not hooks_results:
919
        feedback_fn("  - ERROR: general communication failure")
920
        lu_result = 1
921
      else:
922
        for node_name in hooks_results:
923
          show_node_header = True
924
          res = hooks_results[node_name]
925
          if res is False or not isinstance(res, list):
926
            feedback_fn("    Communication failure")
927
            lu_result = 1
928
            continue
929
          for script, hkr, output in res:
930
            if hkr == constants.HKR_FAIL:
931
              # The node header is only shown once, if there are
932
              # failing hooks on that node
933
              if show_node_header:
934
                feedback_fn("  Node %s:" % node_name)
935
                show_node_header = False
936
              feedback_fn("    ERROR: Script %s failed, output:" % script)
937
              output = indent_re.sub('      ', output)
938
              feedback_fn("%s" % output)
939
              lu_result = 1
940

    
941
      return lu_result
942

    
943

    
944
class LUVerifyDisks(NoHooksLU):
945
  """Verifies the cluster disks status.
946

947
  """
948
  _OP_REQP = []
949
  REQ_BGL = False
950

    
951
  def ExpandNames(self):
952
    self.needed_locks = {
953
      locking.LEVEL_NODE: locking.ALL_SET,
954
      locking.LEVEL_INSTANCE: locking.ALL_SET,
955
    }
956
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
957

    
958
  def CheckPrereq(self):
959
    """Check prerequisites.
960

961
    This has no prerequisites.
962

963
    """
964
    pass
965

    
966
  def Exec(self, feedback_fn):
967
    """Verify integrity of cluster disks.
968

969
    """
970
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
971

    
972
    vg_name = self.cfg.GetVGName()
973
    nodes = utils.NiceSort(self.cfg.GetNodeList())
974
    instances = [self.cfg.GetInstanceInfo(name)
975
                 for name in self.cfg.GetInstanceList()]
976

    
977
    nv_dict = {}
978
    for inst in instances:
979
      inst_lvs = {}
980
      if (inst.status != "up" or
981
          inst.disk_template not in constants.DTS_NET_MIRROR):
982
        continue
983
      inst.MapLVsByNode(inst_lvs)
984
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
985
      for node, vol_list in inst_lvs.iteritems():
986
        for vol in vol_list:
987
          nv_dict[(node, vol)] = inst
988

    
989
    if not nv_dict:
990
      return result
991

    
992
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
993

    
994
    to_act = set()
995
    for node in nodes:
996
      # node_volume
997
      lvs = node_lvs[node]
998

    
999
      if isinstance(lvs, basestring):
1000
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1001
        res_nlvm[node] = lvs
1002
      elif not isinstance(lvs, dict):
1003
        logging.warning("Connection to node %s failed or invalid data"
1004
                        " returned", node)
1005
        res_nodes.append(node)
1006
        continue
1007

    
1008
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1009
        inst = nv_dict.pop((node, lv_name), None)
1010
        if (not lv_online and inst is not None
1011
            and inst.name not in res_instances):
1012
          res_instances.append(inst.name)
1013

    
1014
    # any leftover items in nv_dict are missing LVs, let's arrange the
1015
    # data better
1016
    for key, inst in nv_dict.iteritems():
1017
      if inst.name not in res_missing:
1018
        res_missing[inst.name] = []
1019
      res_missing[inst.name].append(key)
1020

    
1021
    return result
1022

    
1023

    
1024
class LURenameCluster(LogicalUnit):
1025
  """Rename the cluster.
1026

1027
  """
1028
  HPATH = "cluster-rename"
1029
  HTYPE = constants.HTYPE_CLUSTER
1030
  _OP_REQP = ["name"]
1031

    
1032
  def BuildHooksEnv(self):
1033
    """Build hooks env.
1034

1035
    """
1036
    env = {
1037
      "OP_TARGET": self.cfg.GetClusterName(),
1038
      "NEW_NAME": self.op.name,
1039
      }
1040
    mn = self.cfg.GetMasterNode()
1041
    return env, [mn], [mn]
1042

    
1043
  def CheckPrereq(self):
1044
    """Verify that the passed name is a valid one.
1045

1046
    """
1047
    hostname = utils.HostInfo(self.op.name)
1048

    
1049
    new_name = hostname.name
1050
    self.ip = new_ip = hostname.ip
1051
    old_name = self.cfg.GetClusterName()
1052
    old_ip = self.cfg.GetMasterIP()
1053
    if new_name == old_name and new_ip == old_ip:
1054
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1055
                                 " cluster has changed")
1056
    if new_ip != old_ip:
1057
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1058
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1059
                                   " reachable on the network. Aborting." %
1060
                                   new_ip)
1061

    
1062
    self.op.name = new_name
1063

    
1064
  def Exec(self, feedback_fn):
1065
    """Rename the cluster.
1066

1067
    """
1068
    clustername = self.op.name
1069
    ip = self.ip
1070

    
1071
    # shutdown the master IP
1072
    master = self.cfg.GetMasterNode()
1073
    if not self.rpc.call_node_stop_master(master, False):
1074
      raise errors.OpExecError("Could not disable the master role")
1075

    
1076
    try:
1077
      # modify the sstore
1078
      # TODO: sstore
1079
      ss.SetKey(ss.SS_MASTER_IP, ip)
1080
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1081

    
1082
      # Distribute updated ss config to all nodes
1083
      myself = self.cfg.GetNodeInfo(master)
1084
      dist_nodes = self.cfg.GetNodeList()
1085
      if myself.name in dist_nodes:
1086
        dist_nodes.remove(myself.name)
1087

    
1088
      logging.debug("Copying updated ssconf data to all nodes")
1089
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1090
        fname = ss.KeyToFilename(keyname)
1091
        result = self.rpc.call_upload_file(dist_nodes, fname)
1092
        for to_node in dist_nodes:
1093
          if not result[to_node]:
1094
            self.LogWarning("Copy of file %s to node %s failed",
1095
                            fname, to_node)
1096
    finally:
1097
      if not self.rpc.call_node_start_master(master, False):
1098
        self.LogWarning("Could not re-enable the master role on"
1099
                        " the master, please restart manually.")
1100

    
1101

    
1102
def _RecursiveCheckIfLVMBased(disk):
1103
  """Check if the given disk or its children are lvm-based.
1104

1105
  Args:
1106
    disk: ganeti.objects.Disk object
1107

1108
  Returns:
1109
    boolean indicating whether a LD_LV dev_type was found or not
1110

1111
  """
1112
  if disk.children:
1113
    for chdisk in disk.children:
1114
      if _RecursiveCheckIfLVMBased(chdisk):
1115
        return True
1116
  return disk.dev_type == constants.LD_LV
1117

    
1118

    
1119
class LUSetClusterParams(LogicalUnit):
1120
  """Change the parameters of the cluster.
1121

1122
  """
1123
  HPATH = "cluster-modify"
1124
  HTYPE = constants.HTYPE_CLUSTER
1125
  _OP_REQP = []
1126
  REQ_BGL = False
1127

    
1128
  def ExpandNames(self):
1129
    # FIXME: in the future maybe other cluster params won't require checking on
1130
    # all nodes to be modified.
1131
    self.needed_locks = {
1132
      locking.LEVEL_NODE: locking.ALL_SET,
1133
    }
1134
    self.share_locks[locking.LEVEL_NODE] = 1
1135

    
1136
  def BuildHooksEnv(self):
1137
    """Build hooks env.
1138

1139
    """
1140
    env = {
1141
      "OP_TARGET": self.cfg.GetClusterName(),
1142
      "NEW_VG_NAME": self.op.vg_name,
1143
      }
1144
    mn = self.cfg.GetMasterNode()
1145
    return env, [mn], [mn]
1146

    
1147
  def CheckPrereq(self):
1148
    """Check prerequisites.
1149

1150
    This checks whether the given params don't conflict and
1151
    if the given volume group is valid.
1152

1153
    """
1154
    # FIXME: This only works because there is only one parameter that can be
1155
    # changed or removed.
1156
    if self.op.vg_name is not None and not self.op.vg_name:
1157
      instances = self.cfg.GetAllInstancesInfo().values()
1158
      for inst in instances:
1159
        for disk in inst.disks:
1160
          if _RecursiveCheckIfLVMBased(disk):
1161
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1162
                                       " lvm-based instances exist")
1163

    
1164
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1165

    
1166
    # if vg_name not None, checks given volume group on all nodes
1167
    if self.op.vg_name:
1168
      vglist = self.rpc.call_vg_list(node_list)
1169
      for node in node_list:
1170
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1171
                                              constants.MIN_VG_SIZE)
1172
        if vgstatus:
1173
          raise errors.OpPrereqError("Error on node '%s': %s" %
1174
                                     (node, vgstatus))
1175

    
1176
    self.cluster = cluster = self.cfg.GetClusterInfo()
1177
    # beparams changes do not need validation (we can't validate?),
1178
    # but we still process here
1179
    if self.op.beparams:
1180
      self.new_beparams = cluster.FillDict(
1181
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1182

    
1183
    # hypervisor list/parameters
1184
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1185
    if self.op.hvparams:
1186
      if not isinstance(self.op.hvparams, dict):
1187
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1188
      for hv_name, hv_dict in self.op.hvparams.items():
1189
        if hv_name not in self.new_hvparams:
1190
          self.new_hvparams[hv_name] = hv_dict
1191
        else:
1192
          self.new_hvparams[hv_name].update(hv_dict)
1193

    
1194
    if self.op.enabled_hypervisors is not None:
1195
      self.hv_list = self.op.enabled_hypervisors
1196
    else:
1197
      self.hv_list = cluster.enabled_hypervisors
1198

    
1199
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1200
      # either the enabled list has changed, or the parameters have, validate
1201
      for hv_name, hv_params in self.new_hvparams.items():
1202
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1203
            (self.op.enabled_hypervisors and
1204
             hv_name in self.op.enabled_hypervisors)):
1205
          # either this is a new hypervisor, or its parameters have changed
1206
          hv_class = hypervisor.GetHypervisor(hv_name)
1207
          hv_class.CheckParameterSyntax(hv_params)
1208
          _CheckHVParams(self, node_list, hv_name, hv_params)
1209

    
1210
  def Exec(self, feedback_fn):
1211
    """Change the parameters of the cluster.
1212

1213
    """
1214
    if self.op.vg_name is not None:
1215
      if self.op.vg_name != self.cfg.GetVGName():
1216
        self.cfg.SetVGName(self.op.vg_name)
1217
      else:
1218
        feedback_fn("Cluster LVM configuration already in desired"
1219
                    " state, not changing")
1220
    if self.op.hvparams:
1221
      self.cluster.hvparams = self.new_hvparams
1222
    if self.op.enabled_hypervisors is not None:
1223
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1224
    if self.op.beparams:
1225
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1226
    self.cfg.Update(self.cluster)
1227

    
1228

    
1229
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1230
  """Sleep and poll for an instance's disk to sync.
1231

1232
  """
1233
  if not instance.disks:
1234
    return True
1235

    
1236
  if not oneshot:
1237
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1238

    
1239
  node = instance.primary_node
1240

    
1241
  for dev in instance.disks:
1242
    lu.cfg.SetDiskID(dev, node)
1243

    
1244
  retries = 0
1245
  while True:
1246
    max_time = 0
1247
    done = True
1248
    cumul_degraded = False
1249
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1250
    if not rstats:
1251
      lu.LogWarning("Can't get any data from node %s", node)
1252
      retries += 1
1253
      if retries >= 10:
1254
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1255
                                 " aborting." % node)
1256
      time.sleep(6)
1257
      continue
1258
    retries = 0
1259
    for i in range(len(rstats)):
1260
      mstat = rstats[i]
1261
      if mstat is None:
1262
        lu.LogWarning("Can't compute data for node %s/%s",
1263
                           node, instance.disks[i].iv_name)
1264
        continue
1265
      # we ignore the ldisk parameter
1266
      perc_done, est_time, is_degraded, _ = mstat
1267
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268
      if perc_done is not None:
1269
        done = False
1270
        if est_time is not None:
1271
          rem_time = "%d estimated seconds remaining" % est_time
1272
          max_time = est_time
1273
        else:
1274
          rem_time = "no time estimate"
1275
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276
                        (instance.disks[i].iv_name, perc_done, rem_time))
1277
    if done or oneshot:
1278
      break
1279

    
1280
    time.sleep(min(60, max_time))
1281

    
1282
  if done:
1283
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1284
  return not cumul_degraded
1285

    
1286

    
1287
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1288
  """Check that mirrors are not degraded.
1289

1290
  The ldisk parameter, if True, will change the test from the
1291
  is_degraded attribute (which represents overall non-ok status for
1292
  the device(s)) to the ldisk (representing the local storage status).
1293

1294
  """
1295
  lu.cfg.SetDiskID(dev, node)
1296
  if ldisk:
1297
    idx = 6
1298
  else:
1299
    idx = 5
1300

    
1301
  result = True
1302
  if on_primary or dev.AssembleOnSecondary():
1303
    rstats = lu.rpc.call_blockdev_find(node, dev)
1304
    if not rstats:
1305
      logging.warning("Node %s: disk degraded, not found or node down", node)
1306
      result = False
1307
    else:
1308
      result = result and (not rstats[idx])
1309
  if dev.children:
1310
    for child in dev.children:
1311
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1312

    
1313
  return result
1314

    
1315

    
1316
class LUDiagnoseOS(NoHooksLU):
1317
  """Logical unit for OS diagnose/query.
1318

1319
  """
1320
  _OP_REQP = ["output_fields", "names"]
1321
  REQ_BGL = False
1322
  _FIELDS_STATIC = utils.FieldSet()
1323
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1324

    
1325
  def ExpandNames(self):
1326
    if self.op.names:
1327
      raise errors.OpPrereqError("Selective OS query not supported")
1328

    
1329
    _CheckOutputFields(static=self._FIELDS_STATIC,
1330
                       dynamic=self._FIELDS_DYNAMIC,
1331
                       selected=self.op.output_fields)
1332

    
1333
    # Lock all nodes, in shared mode
1334
    self.needed_locks = {}
1335
    self.share_locks[locking.LEVEL_NODE] = 1
1336
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1337

    
1338
  def CheckPrereq(self):
1339
    """Check prerequisites.
1340

1341
    """
1342

    
1343
  @staticmethod
1344
  def _DiagnoseByOS(node_list, rlist):
1345
    """Remaps a per-node return list into an a per-os per-node dictionary
1346

1347
      Args:
1348
        node_list: a list with the names of all nodes
1349
        rlist: a map with node names as keys and OS objects as values
1350

1351
      Returns:
1352
        map: a map with osnames as keys and as value another map, with
1353
             nodes as
1354
             keys and list of OS objects as values
1355
             e.g. {"debian-etch": {"node1": [<object>,...],
1356
                                   "node2": [<object>,]}
1357
                  }
1358

1359
    """
1360
    all_os = {}
1361
    for node_name, nr in rlist.iteritems():
1362
      if not nr:
1363
        continue
1364
      for os_obj in nr:
1365
        if os_obj.name not in all_os:
1366
          # build a list of nodes for this os containing empty lists
1367
          # for each node in node_list
1368
          all_os[os_obj.name] = {}
1369
          for nname in node_list:
1370
            all_os[os_obj.name][nname] = []
1371
        all_os[os_obj.name][node_name].append(os_obj)
1372
    return all_os
1373

    
1374
  def Exec(self, feedback_fn):
1375
    """Compute the list of OSes.
1376

1377
    """
1378
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1379
    node_data = self.rpc.call_os_diagnose(node_list)
1380
    if node_data == False:
1381
      raise errors.OpExecError("Can't gather the list of OSes")
1382
    pol = self._DiagnoseByOS(node_list, node_data)
1383
    output = []
1384
    for os_name, os_data in pol.iteritems():
1385
      row = []
1386
      for field in self.op.output_fields:
1387
        if field == "name":
1388
          val = os_name
1389
        elif field == "valid":
1390
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1391
        elif field == "node_status":
1392
          val = {}
1393
          for node_name, nos_list in os_data.iteritems():
1394
            val[node_name] = [(v.status, v.path) for v in nos_list]
1395
        else:
1396
          raise errors.ParameterError(field)
1397
        row.append(val)
1398
      output.append(row)
1399

    
1400
    return output
1401

    
1402

    
1403
class LURemoveNode(LogicalUnit):
1404
  """Logical unit for removing a node.
1405

1406
  """
1407
  HPATH = "node-remove"
1408
  HTYPE = constants.HTYPE_NODE
1409
  _OP_REQP = ["node_name"]
1410

    
1411
  def BuildHooksEnv(self):
1412
    """Build hooks env.
1413

1414
    This doesn't run on the target node in the pre phase as a failed
1415
    node would then be impossible to remove.
1416

1417
    """
1418
    env = {
1419
      "OP_TARGET": self.op.node_name,
1420
      "NODE_NAME": self.op.node_name,
1421
      }
1422
    all_nodes = self.cfg.GetNodeList()
1423
    all_nodes.remove(self.op.node_name)
1424
    return env, all_nodes, all_nodes
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks:
1430
     - the node exists in the configuration
1431
     - it does not have primary or secondary instances
1432
     - it's not the master
1433

1434
    Any errors are signalled by raising errors.OpPrereqError.
1435

1436
    """
1437
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1438
    if node is None:
1439
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1440

    
1441
    instance_list = self.cfg.GetInstanceList()
1442

    
1443
    masternode = self.cfg.GetMasterNode()
1444
    if node.name == masternode:
1445
      raise errors.OpPrereqError("Node is the master node,"
1446
                                 " you need to failover first.")
1447

    
1448
    for instance_name in instance_list:
1449
      instance = self.cfg.GetInstanceInfo(instance_name)
1450
      if node.name == instance.primary_node:
1451
        raise errors.OpPrereqError("Instance %s still running on the node,"
1452
                                   " please remove first." % instance_name)
1453
      if node.name in instance.secondary_nodes:
1454
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1455
                                   " please remove first." % instance_name)
1456
    self.op.node_name = node.name
1457
    self.node = node
1458

    
1459
  def Exec(self, feedback_fn):
1460
    """Removes the node from the cluster.
1461

1462
    """
1463
    node = self.node
1464
    logging.info("Stopping the node daemon and removing configs from node %s",
1465
                 node.name)
1466

    
1467
    self.context.RemoveNode(node.name)
1468

    
1469
    self.rpc.call_node_leave_cluster(node.name)
1470

    
1471

    
1472
class LUQueryNodes(NoHooksLU):
1473
  """Logical unit for querying nodes.
1474

1475
  """
1476
  _OP_REQP = ["output_fields", "names"]
1477
  REQ_BGL = False
1478
  _FIELDS_DYNAMIC = utils.FieldSet(
1479
    "dtotal", "dfree",
1480
    "mtotal", "mnode", "mfree",
1481
    "bootid",
1482
    "ctotal",
1483
    )
1484

    
1485
  _FIELDS_STATIC = utils.FieldSet(
1486
    "name", "pinst_cnt", "sinst_cnt",
1487
    "pinst_list", "sinst_list",
1488
    "pip", "sip", "tags",
1489
    "serial_no",
1490
    )
1491

    
1492
  def ExpandNames(self):
1493
    _CheckOutputFields(static=self._FIELDS_STATIC,
1494
                       dynamic=self._FIELDS_DYNAMIC,
1495
                       selected=self.op.output_fields)
1496

    
1497
    self.needed_locks = {}
1498
    self.share_locks[locking.LEVEL_NODE] = 1
1499

    
1500
    if self.op.names:
1501
      self.wanted = _GetWantedNodes(self, self.op.names)
1502
    else:
1503
      self.wanted = locking.ALL_SET
1504

    
1505
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1506
    if self.do_locking:
1507
      # if we don't request only static fields, we need to lock the nodes
1508
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1509

    
1510

    
1511
  def CheckPrereq(self):
1512
    """Check prerequisites.
1513

1514
    """
1515
    # The validation of the node list is done in the _GetWantedNodes,
1516
    # if non empty, and if empty, there's no validation to do
1517
    pass
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Computes the list of nodes and their attributes.
1521

1522
    """
1523
    all_info = self.cfg.GetAllNodesInfo()
1524
    if self.do_locking:
1525
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1526
    elif self.wanted != locking.ALL_SET:
1527
      nodenames = self.wanted
1528
      missing = set(nodenames).difference(all_info.keys())
1529
      if missing:
1530
        raise errors.OpExecError(
1531
          "Some nodes were removed before retrieving their data: %s" % missing)
1532
    else:
1533
      nodenames = all_info.keys()
1534

    
1535
    nodenames = utils.NiceSort(nodenames)
1536
    nodelist = [all_info[name] for name in nodenames]
1537

    
1538
    # begin data gathering
1539

    
1540
    if self.do_locking:
1541
      live_data = {}
1542
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1543
                                          self.cfg.GetHypervisorType())
1544
      for name in nodenames:
1545
        nodeinfo = node_data.get(name, None)
1546
        if nodeinfo:
1547
          live_data[name] = {
1548
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1549
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1550
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1551
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1552
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1553
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1554
            "bootid": nodeinfo['bootid'],
1555
            }
1556
        else:
1557
          live_data[name] = {}
1558
    else:
1559
      live_data = dict.fromkeys(nodenames, {})
1560

    
1561
    node_to_primary = dict([(name, set()) for name in nodenames])
1562
    node_to_secondary = dict([(name, set()) for name in nodenames])
1563

    
1564
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1565
                             "sinst_cnt", "sinst_list"))
1566
    if inst_fields & frozenset(self.op.output_fields):
1567
      instancelist = self.cfg.GetInstanceList()
1568

    
1569
      for instance_name in instancelist:
1570
        inst = self.cfg.GetInstanceInfo(instance_name)
1571
        if inst.primary_node in node_to_primary:
1572
          node_to_primary[inst.primary_node].add(inst.name)
1573
        for secnode in inst.secondary_nodes:
1574
          if secnode in node_to_secondary:
1575
            node_to_secondary[secnode].add(inst.name)
1576

    
1577
    # end data gathering
1578

    
1579
    output = []
1580
    for node in nodelist:
1581
      node_output = []
1582
      for field in self.op.output_fields:
1583
        if field == "name":
1584
          val = node.name
1585
        elif field == "pinst_list":
1586
          val = list(node_to_primary[node.name])
1587
        elif field == "sinst_list":
1588
          val = list(node_to_secondary[node.name])
1589
        elif field == "pinst_cnt":
1590
          val = len(node_to_primary[node.name])
1591
        elif field == "sinst_cnt":
1592
          val = len(node_to_secondary[node.name])
1593
        elif field == "pip":
1594
          val = node.primary_ip
1595
        elif field == "sip":
1596
          val = node.secondary_ip
1597
        elif field == "tags":
1598
          val = list(node.GetTags())
1599
        elif field == "serial_no":
1600
          val = node.serial_no
1601
        elif self._FIELDS_DYNAMIC.Matches(field):
1602
          val = live_data[node.name].get(field, None)
1603
        else:
1604
          raise errors.ParameterError(field)
1605
        node_output.append(val)
1606
      output.append(node_output)
1607

    
1608
    return output
1609

    
1610

    
1611
class LUQueryNodeVolumes(NoHooksLU):
1612
  """Logical unit for getting volumes on node(s).
1613

1614
  """
1615
  _OP_REQP = ["nodes", "output_fields"]
1616
  REQ_BGL = False
1617
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1618
  _FIELDS_STATIC = utils.FieldSet("node")
1619

    
1620
  def ExpandNames(self):
1621
    _CheckOutputFields(static=self._FIELDS_STATIC,
1622
                       dynamic=self._FIELDS_DYNAMIC,
1623
                       selected=self.op.output_fields)
1624

    
1625
    self.needed_locks = {}
1626
    self.share_locks[locking.LEVEL_NODE] = 1
1627
    if not self.op.nodes:
1628
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1629
    else:
1630
      self.needed_locks[locking.LEVEL_NODE] = \
1631
        _GetWantedNodes(self, self.op.nodes)
1632

    
1633
  def CheckPrereq(self):
1634
    """Check prerequisites.
1635

1636
    This checks that the fields required are valid output fields.
1637

1638
    """
1639
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1640

    
1641
  def Exec(self, feedback_fn):
1642
    """Computes the list of nodes and their attributes.
1643

1644
    """
1645
    nodenames = self.nodes
1646
    volumes = self.rpc.call_node_volumes(nodenames)
1647

    
1648
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1649
             in self.cfg.GetInstanceList()]
1650

    
1651
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1652

    
1653
    output = []
1654
    for node in nodenames:
1655
      if node not in volumes or not volumes[node]:
1656
        continue
1657

    
1658
      node_vols = volumes[node][:]
1659
      node_vols.sort(key=lambda vol: vol['dev'])
1660

    
1661
      for vol in node_vols:
1662
        node_output = []
1663
        for field in self.op.output_fields:
1664
          if field == "node":
1665
            val = node
1666
          elif field == "phys":
1667
            val = vol['dev']
1668
          elif field == "vg":
1669
            val = vol['vg']
1670
          elif field == "name":
1671
            val = vol['name']
1672
          elif field == "size":
1673
            val = int(float(vol['size']))
1674
          elif field == "instance":
1675
            for inst in ilist:
1676
              if node not in lv_by_node[inst]:
1677
                continue
1678
              if vol['name'] in lv_by_node[inst][node]:
1679
                val = inst.name
1680
                break
1681
            else:
1682
              val = '-'
1683
          else:
1684
            raise errors.ParameterError(field)
1685
          node_output.append(str(val))
1686

    
1687
        output.append(node_output)
1688

    
1689
    return output
1690

    
1691

    
1692
class LUAddNode(LogicalUnit):
1693
  """Logical unit for adding node to the cluster.
1694

1695
  """
1696
  HPATH = "node-add"
1697
  HTYPE = constants.HTYPE_NODE
1698
  _OP_REQP = ["node_name"]
1699

    
1700
  def BuildHooksEnv(self):
1701
    """Build hooks env.
1702

1703
    This will run on all nodes before, and on all nodes + the new node after.
1704

1705
    """
1706
    env = {
1707
      "OP_TARGET": self.op.node_name,
1708
      "NODE_NAME": self.op.node_name,
1709
      "NODE_PIP": self.op.primary_ip,
1710
      "NODE_SIP": self.op.secondary_ip,
1711
      }
1712
    nodes_0 = self.cfg.GetNodeList()
1713
    nodes_1 = nodes_0 + [self.op.node_name, ]
1714
    return env, nodes_0, nodes_1
1715

    
1716
  def CheckPrereq(self):
1717
    """Check prerequisites.
1718

1719
    This checks:
1720
     - the new node is not already in the config
1721
     - it is resolvable
1722
     - its parameters (single/dual homed) matches the cluster
1723

1724
    Any errors are signalled by raising errors.OpPrereqError.
1725

1726
    """
1727
    node_name = self.op.node_name
1728
    cfg = self.cfg
1729

    
1730
    dns_data = utils.HostInfo(node_name)
1731

    
1732
    node = dns_data.name
1733
    primary_ip = self.op.primary_ip = dns_data.ip
1734
    secondary_ip = getattr(self.op, "secondary_ip", None)
1735
    if secondary_ip is None:
1736
      secondary_ip = primary_ip
1737
    if not utils.IsValidIP(secondary_ip):
1738
      raise errors.OpPrereqError("Invalid secondary IP given")
1739
    self.op.secondary_ip = secondary_ip
1740

    
1741
    node_list = cfg.GetNodeList()
1742
    if not self.op.readd and node in node_list:
1743
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1744
                                 node)
1745
    elif self.op.readd and node not in node_list:
1746
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1747

    
1748
    for existing_node_name in node_list:
1749
      existing_node = cfg.GetNodeInfo(existing_node_name)
1750

    
1751
      if self.op.readd and node == existing_node_name:
1752
        if (existing_node.primary_ip != primary_ip or
1753
            existing_node.secondary_ip != secondary_ip):
1754
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1755
                                     " address configuration as before")
1756
        continue
1757

    
1758
      if (existing_node.primary_ip == primary_ip or
1759
          existing_node.secondary_ip == primary_ip or
1760
          existing_node.primary_ip == secondary_ip or
1761
          existing_node.secondary_ip == secondary_ip):
1762
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1763
                                   " existing node %s" % existing_node.name)
1764

    
1765
    # check that the type of the node (single versus dual homed) is the
1766
    # same as for the master
1767
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1768
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1769
    newbie_singlehomed = secondary_ip == primary_ip
1770
    if master_singlehomed != newbie_singlehomed:
1771
      if master_singlehomed:
1772
        raise errors.OpPrereqError("The master has no private ip but the"
1773
                                   " new node has one")
1774
      else:
1775
        raise errors.OpPrereqError("The master has a private ip but the"
1776
                                   " new node doesn't have one")
1777

    
1778
    # checks reachablity
1779
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1780
      raise errors.OpPrereqError("Node not reachable by ping")
1781

    
1782
    if not newbie_singlehomed:
1783
      # check reachability from my secondary ip to newbie's secondary ip
1784
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1785
                           source=myself.secondary_ip):
1786
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1787
                                   " based ping to noded port")
1788

    
1789
    self.new_node = objects.Node(name=node,
1790
                                 primary_ip=primary_ip,
1791
                                 secondary_ip=secondary_ip)
1792

    
1793
  def Exec(self, feedback_fn):
1794
    """Adds the new node to the cluster.
1795

1796
    """
1797
    new_node = self.new_node
1798
    node = new_node.name
1799

    
1800
    # check connectivity
1801
    result = self.rpc.call_version([node])[node]
1802
    if result:
1803
      if constants.PROTOCOL_VERSION == result:
1804
        logging.info("Communication to node %s fine, sw version %s match",
1805
                     node, result)
1806
      else:
1807
        raise errors.OpExecError("Version mismatch master version %s,"
1808
                                 " node version %s" %
1809
                                 (constants.PROTOCOL_VERSION, result))
1810
    else:
1811
      raise errors.OpExecError("Cannot get version from the new node")
1812

    
1813
    # setup ssh on node
1814
    logging.info("Copy ssh key to node %s", node)
1815
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1816
    keyarray = []
1817
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1818
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1819
                priv_key, pub_key]
1820

    
1821
    for i in keyfiles:
1822
      f = open(i, 'r')
1823
      try:
1824
        keyarray.append(f.read())
1825
      finally:
1826
        f.close()
1827

    
1828
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1829
                                    keyarray[2],
1830
                                    keyarray[3], keyarray[4], keyarray[5])
1831

    
1832
    if not result:
1833
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1834

    
1835
    # Add node to our /etc/hosts, and add key to known_hosts
1836
    utils.AddHostToEtcHosts(new_node.name)
1837

    
1838
    if new_node.secondary_ip != new_node.primary_ip:
1839
      if not self.rpc.call_node_has_ip_address(new_node.name,
1840
                                               new_node.secondary_ip):
1841
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1842
                                 " you gave (%s). Please fix and re-run this"
1843
                                 " command." % new_node.secondary_ip)
1844

    
1845
    node_verify_list = [self.cfg.GetMasterNode()]
1846
    node_verify_param = {
1847
      'nodelist': [node],
1848
      # TODO: do a node-net-test as well?
1849
    }
1850

    
1851
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1852
                                       self.cfg.GetClusterName())
1853
    for verifier in node_verify_list:
1854
      if not result[verifier]:
1855
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1856
                                 " for remote verification" % verifier)
1857
      if result[verifier]['nodelist']:
1858
        for failed in result[verifier]['nodelist']:
1859
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1860
                      (verifier, result[verifier]['nodelist'][failed]))
1861
        raise errors.OpExecError("ssh/hostname verification failed.")
1862

    
1863
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1864
    # including the node just added
1865
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1866
    dist_nodes = self.cfg.GetNodeList()
1867
    if not self.op.readd:
1868
      dist_nodes.append(node)
1869
    if myself.name in dist_nodes:
1870
      dist_nodes.remove(myself.name)
1871

    
1872
    logging.debug("Copying hosts and known_hosts to all nodes")
1873
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1874
      result = self.rpc.call_upload_file(dist_nodes, fname)
1875
      for to_node in dist_nodes:
1876
        if not result[to_node]:
1877
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1878

    
1879
    to_copy = []
1880
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1881
      to_copy.append(constants.VNC_PASSWORD_FILE)
1882
    for fname in to_copy:
1883
      result = self.rpc.call_upload_file([node], fname)
1884
      if not result[node]:
1885
        logging.error("Could not copy file %s to node %s", fname, node)
1886

    
1887
    if self.op.readd:
1888
      self.context.ReaddNode(new_node)
1889
    else:
1890
      self.context.AddNode(new_node)
1891

    
1892

    
1893
class LUQueryClusterInfo(NoHooksLU):
1894
  """Query cluster configuration.
1895

1896
  """
1897
  _OP_REQP = []
1898
  REQ_MASTER = False
1899
  REQ_BGL = False
1900

    
1901
  def ExpandNames(self):
1902
    self.needed_locks = {}
1903

    
1904
  def CheckPrereq(self):
1905
    """No prerequsites needed for this LU.
1906

1907
    """
1908
    pass
1909

    
1910
  def Exec(self, feedback_fn):
1911
    """Return cluster config.
1912

1913
    """
1914
    cluster = self.cfg.GetClusterInfo()
1915
    result = {
1916
      "software_version": constants.RELEASE_VERSION,
1917
      "protocol_version": constants.PROTOCOL_VERSION,
1918
      "config_version": constants.CONFIG_VERSION,
1919
      "os_api_version": constants.OS_API_VERSION,
1920
      "export_version": constants.EXPORT_VERSION,
1921
      "architecture": (platform.architecture()[0], platform.machine()),
1922
      "name": cluster.cluster_name,
1923
      "master": cluster.master_node,
1924
      "default_hypervisor": cluster.default_hypervisor,
1925
      "enabled_hypervisors": cluster.enabled_hypervisors,
1926
      "hvparams": cluster.hvparams,
1927
      "beparams": cluster.beparams,
1928
      }
1929

    
1930
    return result
1931

    
1932

    
1933
class LUQueryConfigValues(NoHooksLU):
1934
  """Return configuration values.
1935

1936
  """
1937
  _OP_REQP = []
1938
  REQ_BGL = False
1939
  _FIELDS_DYNAMIC = utils.FieldSet()
1940
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
1941

    
1942
  def ExpandNames(self):
1943
    self.needed_locks = {}
1944

    
1945
    _CheckOutputFields(static=self._FIELDS_STATIC,
1946
                       dynamic=self._FIELDS_DYNAMIC,
1947
                       selected=self.op.output_fields)
1948

    
1949
  def CheckPrereq(self):
1950
    """No prerequisites.
1951

1952
    """
1953
    pass
1954

    
1955
  def Exec(self, feedback_fn):
1956
    """Dump a representation of the cluster config to the standard output.
1957

1958
    """
1959
    values = []
1960
    for field in self.op.output_fields:
1961
      if field == "cluster_name":
1962
        entry = self.cfg.GetClusterName()
1963
      elif field == "master_node":
1964
        entry = self.cfg.GetMasterNode()
1965
      elif field == "drain_flag":
1966
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1967
      else:
1968
        raise errors.ParameterError(field)
1969
      values.append(entry)
1970
    return values
1971

    
1972

    
1973
class LUActivateInstanceDisks(NoHooksLU):
1974
  """Bring up an instance's disks.
1975

1976
  """
1977
  _OP_REQP = ["instance_name"]
1978
  REQ_BGL = False
1979

    
1980
  def ExpandNames(self):
1981
    self._ExpandAndLockInstance()
1982
    self.needed_locks[locking.LEVEL_NODE] = []
1983
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1984

    
1985
  def DeclareLocks(self, level):
1986
    if level == locking.LEVEL_NODE:
1987
      self._LockInstancesNodes()
1988

    
1989
  def CheckPrereq(self):
1990
    """Check prerequisites.
1991

1992
    This checks that the instance is in the cluster.
1993

1994
    """
1995
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1996
    assert self.instance is not None, \
1997
      "Cannot retrieve locked instance %s" % self.op.instance_name
1998

    
1999
  def Exec(self, feedback_fn):
2000
    """Activate the disks.
2001

2002
    """
2003
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2004
    if not disks_ok:
2005
      raise errors.OpExecError("Cannot activate block devices")
2006

    
2007
    return disks_info
2008

    
2009

    
2010
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2011
  """Prepare the block devices for an instance.
2012

2013
  This sets up the block devices on all nodes.
2014

2015
  Args:
2016
    instance: a ganeti.objects.Instance object
2017
    ignore_secondaries: if true, errors on secondary nodes won't result
2018
                        in an error return from the function
2019

2020
  Returns:
2021
    false if the operation failed
2022
    list of (host, instance_visible_name, node_visible_name) if the operation
2023
         suceeded with the mapping from node devices to instance devices
2024
  """
2025
  device_info = []
2026
  disks_ok = True
2027
  iname = instance.name
2028
  # With the two passes mechanism we try to reduce the window of
2029
  # opportunity for the race condition of switching DRBD to primary
2030
  # before handshaking occured, but we do not eliminate it
2031

    
2032
  # The proper fix would be to wait (with some limits) until the
2033
  # connection has been made and drbd transitions from WFConnection
2034
  # into any other network-connected state (Connected, SyncTarget,
2035
  # SyncSource, etc.)
2036

    
2037
  # 1st pass, assemble on all nodes in secondary mode
2038
  for inst_disk in instance.disks:
2039
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2040
      lu.cfg.SetDiskID(node_disk, node)
2041
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2042
      if not result:
2043
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2044
                           " (is_primary=False, pass=1)",
2045
                           inst_disk.iv_name, node)
2046
        if not ignore_secondaries:
2047
          disks_ok = False
2048

    
2049
  # FIXME: race condition on drbd migration to primary
2050

    
2051
  # 2nd pass, do only the primary node
2052
  for inst_disk in instance.disks:
2053
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2054
      if node != instance.primary_node:
2055
        continue
2056
      lu.cfg.SetDiskID(node_disk, node)
2057
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2058
      if not result:
2059
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2060
                           " (is_primary=True, pass=2)",
2061
                           inst_disk.iv_name, node)
2062
        disks_ok = False
2063
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2064

    
2065
  # leave the disks configured for the primary node
2066
  # this is a workaround that would be fixed better by
2067
  # improving the logical/physical id handling
2068
  for disk in instance.disks:
2069
    lu.cfg.SetDiskID(disk, instance.primary_node)
2070

    
2071
  return disks_ok, device_info
2072

    
2073

    
2074
def _StartInstanceDisks(lu, instance, force):
2075
  """Start the disks of an instance.
2076

2077
  """
2078
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2079
                                           ignore_secondaries=force)
2080
  if not disks_ok:
2081
    _ShutdownInstanceDisks(lu, instance)
2082
    if force is not None and not force:
2083
      lu.proc.LogWarning("", hint="If the message above refers to a"
2084
                         " secondary node,"
2085
                         " you can retry the operation using '--force'.")
2086
    raise errors.OpExecError("Disk consistency error")
2087

    
2088

    
2089
class LUDeactivateInstanceDisks(NoHooksLU):
2090
  """Shutdown an instance's disks.
2091

2092
  """
2093
  _OP_REQP = ["instance_name"]
2094
  REQ_BGL = False
2095

    
2096
  def ExpandNames(self):
2097
    self._ExpandAndLockInstance()
2098
    self.needed_locks[locking.LEVEL_NODE] = []
2099
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2100

    
2101
  def DeclareLocks(self, level):
2102
    if level == locking.LEVEL_NODE:
2103
      self._LockInstancesNodes()
2104

    
2105
  def CheckPrereq(self):
2106
    """Check prerequisites.
2107

2108
    This checks that the instance is in the cluster.
2109

2110
    """
2111
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2112
    assert self.instance is not None, \
2113
      "Cannot retrieve locked instance %s" % self.op.instance_name
2114

    
2115
  def Exec(self, feedback_fn):
2116
    """Deactivate the disks
2117

2118
    """
2119
    instance = self.instance
2120
    _SafeShutdownInstanceDisks(self, instance)
2121

    
2122

    
2123
def _SafeShutdownInstanceDisks(lu, instance):
2124
  """Shutdown block devices of an instance.
2125

2126
  This function checks if an instance is running, before calling
2127
  _ShutdownInstanceDisks.
2128

2129
  """
2130
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2131
                                      [instance.hypervisor])
2132
  ins_l = ins_l[instance.primary_node]
2133
  if not type(ins_l) is list:
2134
    raise errors.OpExecError("Can't contact node '%s'" %
2135
                             instance.primary_node)
2136

    
2137
  if instance.name in ins_l:
2138
    raise errors.OpExecError("Instance is running, can't shutdown"
2139
                             " block devices.")
2140

    
2141
  _ShutdownInstanceDisks(lu, instance)
2142

    
2143

    
2144
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2145
  """Shutdown block devices of an instance.
2146

2147
  This does the shutdown on all nodes of the instance.
2148

2149
  If the ignore_primary is false, errors on the primary node are
2150
  ignored.
2151

2152
  """
2153
  result = True
2154
  for disk in instance.disks:
2155
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2156
      lu.cfg.SetDiskID(top_disk, node)
2157
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2158
        logging.error("Could not shutdown block device %s on node %s",
2159
                      disk.iv_name, node)
2160
        if not ignore_primary or node != instance.primary_node:
2161
          result = False
2162
  return result
2163

    
2164

    
2165
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2166
  """Checks if a node has enough free memory.
2167

2168
  This function check if a given node has the needed amount of free
2169
  memory. In case the node has less memory or we cannot get the
2170
  information from the node, this function raise an OpPrereqError
2171
  exception.
2172

2173
  @type lu: C{LogicalUnit}
2174
  @param lu: a logical unit from which we get configuration data
2175
  @type node: C{str}
2176
  @param node: the node to check
2177
  @type reason: C{str}
2178
  @param reason: string to use in the error message
2179
  @type requested: C{int}
2180
  @param requested: the amount of memory in MiB to check for
2181
  @type hypervisor: C{str}
2182
  @param hypervisor: the hypervisor to ask for memory stats
2183
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2184
      we cannot check the node
2185

2186
  """
2187
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2188
  if not nodeinfo or not isinstance(nodeinfo, dict):
2189
    raise errors.OpPrereqError("Could not contact node %s for resource"
2190
                             " information" % (node,))
2191

    
2192
  free_mem = nodeinfo[node].get('memory_free')
2193
  if not isinstance(free_mem, int):
2194
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2195
                             " was '%s'" % (node, free_mem))
2196
  if requested > free_mem:
2197
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2198
                             " needed %s MiB, available %s MiB" %
2199
                             (node, reason, requested, free_mem))
2200

    
2201

    
2202
class LUStartupInstance(LogicalUnit):
2203
  """Starts an instance.
2204

2205
  """
2206
  HPATH = "instance-start"
2207
  HTYPE = constants.HTYPE_INSTANCE
2208
  _OP_REQP = ["instance_name", "force"]
2209
  REQ_BGL = False
2210

    
2211
  def ExpandNames(self):
2212
    self._ExpandAndLockInstance()
2213
    self.needed_locks[locking.LEVEL_NODE] = []
2214
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2215

    
2216
  def DeclareLocks(self, level):
2217
    if level == locking.LEVEL_NODE:
2218
      self._LockInstancesNodes()
2219

    
2220
  def BuildHooksEnv(self):
2221
    """Build hooks env.
2222

2223
    This runs on master, primary and secondary nodes of the instance.
2224

2225
    """
2226
    env = {
2227
      "FORCE": self.op.force,
2228
      }
2229
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2230
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2231
          list(self.instance.secondary_nodes))
2232
    return env, nl, nl
2233

    
2234
  def CheckPrereq(self):
2235
    """Check prerequisites.
2236

2237
    This checks that the instance is in the cluster.
2238

2239
    """
2240
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2241
    assert self.instance is not None, \
2242
      "Cannot retrieve locked instance %s" % self.op.instance_name
2243

    
2244
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2245
    # check bridges existance
2246
    _CheckInstanceBridgesExist(self, instance)
2247

    
2248
    _CheckNodeFreeMemory(self, instance.primary_node,
2249
                         "starting instance %s" % instance.name,
2250
                         bep[constants.BE_MEMORY], instance.hypervisor)
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Start the instance.
2254

2255
    """
2256
    instance = self.instance
2257
    force = self.op.force
2258
    extra_args = getattr(self.op, "extra_args", "")
2259

    
2260
    self.cfg.MarkInstanceUp(instance.name)
2261

    
2262
    node_current = instance.primary_node
2263

    
2264
    _StartInstanceDisks(self, instance, force)
2265

    
2266
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2267
      _ShutdownInstanceDisks(self, instance)
2268
      raise errors.OpExecError("Could not start instance")
2269

    
2270

    
2271
class LURebootInstance(LogicalUnit):
2272
  """Reboot an instance.
2273

2274
  """
2275
  HPATH = "instance-reboot"
2276
  HTYPE = constants.HTYPE_INSTANCE
2277
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2278
  REQ_BGL = False
2279

    
2280
  def ExpandNames(self):
2281
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2282
                                   constants.INSTANCE_REBOOT_HARD,
2283
                                   constants.INSTANCE_REBOOT_FULL]:
2284
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2285
                                  (constants.INSTANCE_REBOOT_SOFT,
2286
                                   constants.INSTANCE_REBOOT_HARD,
2287
                                   constants.INSTANCE_REBOOT_FULL))
2288
    self._ExpandAndLockInstance()
2289
    self.needed_locks[locking.LEVEL_NODE] = []
2290
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2291

    
2292
  def DeclareLocks(self, level):
2293
    if level == locking.LEVEL_NODE:
2294
      primary_only = not constants.INSTANCE_REBOOT_FULL
2295
      self._LockInstancesNodes(primary_only=primary_only)
2296

    
2297
  def BuildHooksEnv(self):
2298
    """Build hooks env.
2299

2300
    This runs on master, primary and secondary nodes of the instance.
2301

2302
    """
2303
    env = {
2304
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2305
      }
2306
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2307
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2308
          list(self.instance.secondary_nodes))
2309
    return env, nl, nl
2310

    
2311
  def CheckPrereq(self):
2312
    """Check prerequisites.
2313

2314
    This checks that the instance is in the cluster.
2315

2316
    """
2317
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2318
    assert self.instance is not None, \
2319
      "Cannot retrieve locked instance %s" % self.op.instance_name
2320

    
2321
    # check bridges existance
2322
    _CheckInstanceBridgesExist(self, instance)
2323

    
2324
  def Exec(self, feedback_fn):
2325
    """Reboot the instance.
2326

2327
    """
2328
    instance = self.instance
2329
    ignore_secondaries = self.op.ignore_secondaries
2330
    reboot_type = self.op.reboot_type
2331
    extra_args = getattr(self.op, "extra_args", "")
2332

    
2333
    node_current = instance.primary_node
2334

    
2335
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2336
                       constants.INSTANCE_REBOOT_HARD]:
2337
      if not self.rpc.call_instance_reboot(node_current, instance,
2338
                                           reboot_type, extra_args):
2339
        raise errors.OpExecError("Could not reboot instance")
2340
    else:
2341
      if not self.rpc.call_instance_shutdown(node_current, instance):
2342
        raise errors.OpExecError("could not shutdown instance for full reboot")
2343
      _ShutdownInstanceDisks(self, instance)
2344
      _StartInstanceDisks(self, instance, ignore_secondaries)
2345
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2346
        _ShutdownInstanceDisks(self, instance)
2347
        raise errors.OpExecError("Could not start instance for full reboot")
2348

    
2349
    self.cfg.MarkInstanceUp(instance.name)
2350

    
2351

    
2352
class LUShutdownInstance(LogicalUnit):
2353
  """Shutdown an instance.
2354

2355
  """
2356
  HPATH = "instance-stop"
2357
  HTYPE = constants.HTYPE_INSTANCE
2358
  _OP_REQP = ["instance_name"]
2359
  REQ_BGL = False
2360

    
2361
  def ExpandNames(self):
2362
    self._ExpandAndLockInstance()
2363
    self.needed_locks[locking.LEVEL_NODE] = []
2364
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2365

    
2366
  def DeclareLocks(self, level):
2367
    if level == locking.LEVEL_NODE:
2368
      self._LockInstancesNodes()
2369

    
2370
  def BuildHooksEnv(self):
2371
    """Build hooks env.
2372

2373
    This runs on master, primary and secondary nodes of the instance.
2374

2375
    """
2376
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2377
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2378
          list(self.instance.secondary_nodes))
2379
    return env, nl, nl
2380

    
2381
  def CheckPrereq(self):
2382
    """Check prerequisites.
2383

2384
    This checks that the instance is in the cluster.
2385

2386
    """
2387
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2388
    assert self.instance is not None, \
2389
      "Cannot retrieve locked instance %s" % self.op.instance_name
2390

    
2391
  def Exec(self, feedback_fn):
2392
    """Shutdown the instance.
2393

2394
    """
2395
    instance = self.instance
2396
    node_current = instance.primary_node
2397
    self.cfg.MarkInstanceDown(instance.name)
2398
    if not self.rpc.call_instance_shutdown(node_current, instance):
2399
      self.proc.LogWarning("Could not shutdown instance")
2400

    
2401
    _ShutdownInstanceDisks(self, instance)
2402

    
2403

    
2404
class LUReinstallInstance(LogicalUnit):
2405
  """Reinstall an instance.
2406

2407
  """
2408
  HPATH = "instance-reinstall"
2409
  HTYPE = constants.HTYPE_INSTANCE
2410
  _OP_REQP = ["instance_name"]
2411
  REQ_BGL = False
2412

    
2413
  def ExpandNames(self):
2414
    self._ExpandAndLockInstance()
2415
    self.needed_locks[locking.LEVEL_NODE] = []
2416
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2417

    
2418
  def DeclareLocks(self, level):
2419
    if level == locking.LEVEL_NODE:
2420
      self._LockInstancesNodes()
2421

    
2422
  def BuildHooksEnv(self):
2423
    """Build hooks env.
2424

2425
    This runs on master, primary and secondary nodes of the instance.
2426

2427
    """
2428
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2429
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2430
          list(self.instance.secondary_nodes))
2431
    return env, nl, nl
2432

    
2433
  def CheckPrereq(self):
2434
    """Check prerequisites.
2435

2436
    This checks that the instance is in the cluster and is not running.
2437

2438
    """
2439
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2440
    assert instance is not None, \
2441
      "Cannot retrieve locked instance %s" % self.op.instance_name
2442

    
2443
    if instance.disk_template == constants.DT_DISKLESS:
2444
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2445
                                 self.op.instance_name)
2446
    if instance.status != "down":
2447
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2448
                                 self.op.instance_name)
2449
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2450
                                              instance.name,
2451
                                              instance.hypervisor)
2452
    if remote_info:
2453
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2454
                                 (self.op.instance_name,
2455
                                  instance.primary_node))
2456

    
2457
    self.op.os_type = getattr(self.op, "os_type", None)
2458
    if self.op.os_type is not None:
2459
      # OS verification
2460
      pnode = self.cfg.GetNodeInfo(
2461
        self.cfg.ExpandNodeName(instance.primary_node))
2462
      if pnode is None:
2463
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2464
                                   self.op.pnode)
2465
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2466
      if not os_obj:
2467
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2468
                                   " primary node"  % self.op.os_type)
2469

    
2470
    self.instance = instance
2471

    
2472
  def Exec(self, feedback_fn):
2473
    """Reinstall the instance.
2474

2475
    """
2476
    inst = self.instance
2477

    
2478
    if self.op.os_type is not None:
2479
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2480
      inst.os = self.op.os_type
2481
      self.cfg.Update(inst)
2482

    
2483
    _StartInstanceDisks(self, inst, None)
2484
    try:
2485
      feedback_fn("Running the instance OS create scripts...")
2486
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2487
        raise errors.OpExecError("Could not install OS for instance %s"
2488
                                 " on node %s" %
2489
                                 (inst.name, inst.primary_node))
2490
    finally:
2491
      _ShutdownInstanceDisks(self, inst)
2492

    
2493

    
2494
class LURenameInstance(LogicalUnit):
2495
  """Rename an instance.
2496

2497
  """
2498
  HPATH = "instance-rename"
2499
  HTYPE = constants.HTYPE_INSTANCE
2500
  _OP_REQP = ["instance_name", "new_name"]
2501

    
2502
  def BuildHooksEnv(self):
2503
    """Build hooks env.
2504

2505
    This runs on master, primary and secondary nodes of the instance.
2506

2507
    """
2508
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2509
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2510
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2511
          list(self.instance.secondary_nodes))
2512
    return env, nl, nl
2513

    
2514
  def CheckPrereq(self):
2515
    """Check prerequisites.
2516

2517
    This checks that the instance is in the cluster and is not running.
2518

2519
    """
2520
    instance = self.cfg.GetInstanceInfo(
2521
      self.cfg.ExpandInstanceName(self.op.instance_name))
2522
    if instance is None:
2523
      raise errors.OpPrereqError("Instance '%s' not known" %
2524
                                 self.op.instance_name)
2525
    if instance.status != "down":
2526
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2527
                                 self.op.instance_name)
2528
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2529
                                              instance.name,
2530
                                              instance.hypervisor)
2531
    if remote_info:
2532
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2533
                                 (self.op.instance_name,
2534
                                  instance.primary_node))
2535
    self.instance = instance
2536

    
2537
    # new name verification
2538
    name_info = utils.HostInfo(self.op.new_name)
2539

    
2540
    self.op.new_name = new_name = name_info.name
2541
    instance_list = self.cfg.GetInstanceList()
2542
    if new_name in instance_list:
2543
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2544
                                 new_name)
2545

    
2546
    if not getattr(self.op, "ignore_ip", False):
2547
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2548
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2549
                                   (name_info.ip, new_name))
2550

    
2551

    
2552
  def Exec(self, feedback_fn):
2553
    """Reinstall the instance.
2554

2555
    """
2556
    inst = self.instance
2557
    old_name = inst.name
2558

    
2559
    if inst.disk_template == constants.DT_FILE:
2560
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2561

    
2562
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2563
    # Change the instance lock. This is definitely safe while we hold the BGL
2564
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2565
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2566

    
2567
    # re-read the instance from the configuration after rename
2568
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2569

    
2570
    if inst.disk_template == constants.DT_FILE:
2571
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2572
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2573
                                                     old_file_storage_dir,
2574
                                                     new_file_storage_dir)
2575

    
2576
      if not result:
2577
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2578
                                 " directory '%s' to '%s' (but the instance"
2579
                                 " has been renamed in Ganeti)" % (
2580
                                 inst.primary_node, old_file_storage_dir,
2581
                                 new_file_storage_dir))
2582

    
2583
      if not result[0]:
2584
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2585
                                 " (but the instance has been renamed in"
2586
                                 " Ganeti)" % (old_file_storage_dir,
2587
                                               new_file_storage_dir))
2588

    
2589
    _StartInstanceDisks(self, inst, None)
2590
    try:
2591
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2592
                                               old_name):
2593
        msg = ("Could not run OS rename script for instance %s on node %s"
2594
               " (but the instance has been renamed in Ganeti)" %
2595
               (inst.name, inst.primary_node))
2596
        self.proc.LogWarning(msg)
2597
    finally:
2598
      _ShutdownInstanceDisks(self, inst)
2599

    
2600

    
2601
class LURemoveInstance(LogicalUnit):
2602
  """Remove an instance.
2603

2604
  """
2605
  HPATH = "instance-remove"
2606
  HTYPE = constants.HTYPE_INSTANCE
2607
  _OP_REQP = ["instance_name", "ignore_failures"]
2608
  REQ_BGL = False
2609

    
2610
  def ExpandNames(self):
2611
    self._ExpandAndLockInstance()
2612
    self.needed_locks[locking.LEVEL_NODE] = []
2613
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2614

    
2615
  def DeclareLocks(self, level):
2616
    if level == locking.LEVEL_NODE:
2617
      self._LockInstancesNodes()
2618

    
2619
  def BuildHooksEnv(self):
2620
    """Build hooks env.
2621

2622
    This runs on master, primary and secondary nodes of the instance.
2623

2624
    """
2625
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2626
    nl = [self.cfg.GetMasterNode()]
2627
    return env, nl, nl
2628

    
2629
  def CheckPrereq(self):
2630
    """Check prerequisites.
2631

2632
    This checks that the instance is in the cluster.
2633

2634
    """
2635
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2636
    assert self.instance is not None, \
2637
      "Cannot retrieve locked instance %s" % self.op.instance_name
2638

    
2639
  def Exec(self, feedback_fn):
2640
    """Remove the instance.
2641

2642
    """
2643
    instance = self.instance
2644
    logging.info("Shutting down instance %s on node %s",
2645
                 instance.name, instance.primary_node)
2646

    
2647
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2648
      if self.op.ignore_failures:
2649
        feedback_fn("Warning: can't shutdown instance")
2650
      else:
2651
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2652
                                 (instance.name, instance.primary_node))
2653

    
2654
    logging.info("Removing block devices for instance %s", instance.name)
2655

    
2656
    if not _RemoveDisks(self, instance):
2657
      if self.op.ignore_failures:
2658
        feedback_fn("Warning: can't remove instance's disks")
2659
      else:
2660
        raise errors.OpExecError("Can't remove instance's disks")
2661

    
2662
    logging.info("Removing instance %s out of cluster config", instance.name)
2663

    
2664
    self.cfg.RemoveInstance(instance.name)
2665
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2666

    
2667

    
2668
class LUQueryInstances(NoHooksLU):
2669
  """Logical unit for querying instances.
2670

2671
  """
2672
  _OP_REQP = ["output_fields", "names"]
2673
  REQ_BGL = False
2674
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2675
                                    "admin_state", "admin_ram",
2676
                                    "disk_template", "ip", "mac", "bridge",
2677
                                    "sda_size", "sdb_size", "vcpus", "tags",
2678
                                    "network_port", "beparams",
2679
                                    "(disk).(size)/([0-9]+)",
2680
                                    "(disk).(sizes)",
2681
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2682
                                    "(nic).(macs|ips|bridges)",
2683
                                    "(disk|nic).(count)",
2684
                                    "serial_no", "hypervisor", "hvparams",] +
2685
                                  ["hv/%s" % name
2686
                                   for name in constants.HVS_PARAMETERS] +
2687
                                  ["be/%s" % name
2688
                                   for name in constants.BES_PARAMETERS])
2689
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2690

    
2691

    
2692
  def ExpandNames(self):
2693
    _CheckOutputFields(static=self._FIELDS_STATIC,
2694
                       dynamic=self._FIELDS_DYNAMIC,
2695
                       selected=self.op.output_fields)
2696

    
2697
    self.needed_locks = {}
2698
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2699
    self.share_locks[locking.LEVEL_NODE] = 1
2700

    
2701
    if self.op.names:
2702
      self.wanted = _GetWantedInstances(self, self.op.names)
2703
    else:
2704
      self.wanted = locking.ALL_SET
2705

    
2706
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2707
    if self.do_locking:
2708
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2709
      self.needed_locks[locking.LEVEL_NODE] = []
2710
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2711

    
2712
  def DeclareLocks(self, level):
2713
    if level == locking.LEVEL_NODE and self.do_locking:
2714
      self._LockInstancesNodes()
2715

    
2716
  def CheckPrereq(self):
2717
    """Check prerequisites.
2718

2719
    """
2720
    pass
2721

    
2722
  def Exec(self, feedback_fn):
2723
    """Computes the list of nodes and their attributes.
2724

2725
    """
2726
    all_info = self.cfg.GetAllInstancesInfo()
2727
    if self.do_locking:
2728
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2729
    elif self.wanted != locking.ALL_SET:
2730
      instance_names = self.wanted
2731
      missing = set(instance_names).difference(all_info.keys())
2732
      if missing:
2733
        raise errors.OpExecError(
2734
          "Some instances were removed before retrieving their data: %s"
2735
          % missing)
2736
    else:
2737
      instance_names = all_info.keys()
2738

    
2739
    instance_names = utils.NiceSort(instance_names)
2740
    instance_list = [all_info[iname] for iname in instance_names]
2741

    
2742
    # begin data gathering
2743

    
2744
    nodes = frozenset([inst.primary_node for inst in instance_list])
2745
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2746

    
2747
    bad_nodes = []
2748
    if self.do_locking:
2749
      live_data = {}
2750
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2751
      for name in nodes:
2752
        result = node_data[name]
2753
        if result:
2754
          live_data.update(result)
2755
        elif result == False:
2756
          bad_nodes.append(name)
2757
        # else no instance is alive
2758
    else:
2759
      live_data = dict([(name, {}) for name in instance_names])
2760

    
2761
    # end data gathering
2762

    
2763
    HVPREFIX = "hv/"
2764
    BEPREFIX = "be/"
2765
    output = []
2766
    for instance in instance_list:
2767
      iout = []
2768
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2769
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2770
      for field in self.op.output_fields:
2771
        st_match = self._FIELDS_STATIC.Matches(field)
2772
        if field == "name":
2773
          val = instance.name
2774
        elif field == "os":
2775
          val = instance.os
2776
        elif field == "pnode":
2777
          val = instance.primary_node
2778
        elif field == "snodes":
2779
          val = list(instance.secondary_nodes)
2780
        elif field == "admin_state":
2781
          val = (instance.status != "down")
2782
        elif field == "oper_state":
2783
          if instance.primary_node in bad_nodes:
2784
            val = None
2785
          else:
2786
            val = bool(live_data.get(instance.name))
2787
        elif field == "status":
2788
          if instance.primary_node in bad_nodes:
2789
            val = "ERROR_nodedown"
2790
          else:
2791
            running = bool(live_data.get(instance.name))
2792
            if running:
2793
              if instance.status != "down":
2794
                val = "running"
2795
              else:
2796
                val = "ERROR_up"
2797
            else:
2798
              if instance.status != "down":
2799
                val = "ERROR_down"
2800
              else:
2801
                val = "ADMIN_down"
2802
        elif field == "oper_ram":
2803
          if instance.primary_node in bad_nodes:
2804
            val = None
2805
          elif instance.name in live_data:
2806
            val = live_data[instance.name].get("memory", "?")
2807
          else:
2808
            val = "-"
2809
        elif field == "disk_template":
2810
          val = instance.disk_template
2811
        elif field == "ip":
2812
          val = instance.nics[0].ip
2813
        elif field == "bridge":
2814
          val = instance.nics[0].bridge
2815
        elif field == "mac":
2816
          val = instance.nics[0].mac
2817
        elif field == "sda_size" or field == "sdb_size":
2818
          idx = ord(field[2]) - ord('a')
2819
          try:
2820
            val = instance.FindDisk(idx).size
2821
          except errors.OpPrereqError:
2822
            val = None
2823
        elif field == "tags":
2824
          val = list(instance.GetTags())
2825
        elif field == "serial_no":
2826
          val = instance.serial_no
2827
        elif field == "network_port":
2828
          val = instance.network_port
2829
        elif field == "hypervisor":
2830
          val = instance.hypervisor
2831
        elif field == "hvparams":
2832
          val = i_hv
2833
        elif (field.startswith(HVPREFIX) and
2834
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2835
          val = i_hv.get(field[len(HVPREFIX):], None)
2836
        elif field == "beparams":
2837
          val = i_be
2838
        elif (field.startswith(BEPREFIX) and
2839
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2840
          val = i_be.get(field[len(BEPREFIX):], None)
2841
        elif st_match and st_match.groups():
2842
          # matches a variable list
2843
          st_groups = st_match.groups()
2844
          if st_groups and st_groups[0] == "disk":
2845
            if st_groups[1] == "count":
2846
              val = len(instance.disks)
2847
            elif st_groups[1] == "sizes":
2848
              val = [disk.size for disk in instance.disks]
2849
            elif st_groups[1] == "size":
2850
              try:
2851
                val = instance.FindDisk(st_groups[2]).size
2852
              except errors.OpPrereqError:
2853
                val = None
2854
            else:
2855
              assert False, "Unhandled disk parameter"
2856
          elif st_groups[0] == "nic":
2857
            if st_groups[1] == "count":
2858
              val = len(instance.nics)
2859
            elif st_groups[1] == "macs":
2860
              val = [nic.mac for nic in instance.nics]
2861
            elif st_groups[1] == "ips":
2862
              val = [nic.ip for nic in instance.nics]
2863
            elif st_groups[1] == "bridges":
2864
              val = [nic.bridge for nic in instance.nics]
2865
            else:
2866
              # index-based item
2867
              nic_idx = int(st_groups[2])
2868
              if nic_idx >= len(instance.nics):
2869
                val = None
2870
              else:
2871
                if st_groups[1] == "mac":
2872
                  val = instance.nics[nic_idx].mac
2873
                elif st_groups[1] == "ip":
2874
                  val = instance.nics[nic_idx].ip
2875
                elif st_groups[1] == "bridge":
2876
                  val = instance.nics[nic_idx].bridge
2877
                else:
2878
                  assert False, "Unhandled NIC parameter"
2879
          else:
2880
            assert False, "Unhandled variable parameter"
2881
        else:
2882
          raise errors.ParameterError(field)
2883
        iout.append(val)
2884
      output.append(iout)
2885

    
2886
    return output
2887

    
2888

    
2889
class LUFailoverInstance(LogicalUnit):
2890
  """Failover an instance.
2891

2892
  """
2893
  HPATH = "instance-failover"
2894
  HTYPE = constants.HTYPE_INSTANCE
2895
  _OP_REQP = ["instance_name", "ignore_consistency"]
2896
  REQ_BGL = False
2897

    
2898
  def ExpandNames(self):
2899
    self._ExpandAndLockInstance()
2900
    self.needed_locks[locking.LEVEL_NODE] = []
2901
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2902

    
2903
  def DeclareLocks(self, level):
2904
    if level == locking.LEVEL_NODE:
2905
      self._LockInstancesNodes()
2906

    
2907
  def BuildHooksEnv(self):
2908
    """Build hooks env.
2909

2910
    This runs on master, primary and secondary nodes of the instance.
2911

2912
    """
2913
    env = {
2914
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2915
      }
2916
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2917
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2918
    return env, nl, nl
2919

    
2920
  def CheckPrereq(self):
2921
    """Check prerequisites.
2922

2923
    This checks that the instance is in the cluster.
2924

2925
    """
2926
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2927
    assert self.instance is not None, \
2928
      "Cannot retrieve locked instance %s" % self.op.instance_name
2929

    
2930
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2931
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2932
      raise errors.OpPrereqError("Instance's disk layout is not"
2933
                                 " network mirrored, cannot failover.")
2934

    
2935
    secondary_nodes = instance.secondary_nodes
2936
    if not secondary_nodes:
2937
      raise errors.ProgrammerError("no secondary node but using "
2938
                                   "a mirrored disk template")
2939

    
2940
    target_node = secondary_nodes[0]
2941
    # check memory requirements on the secondary node
2942
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2943
                         instance.name, bep[constants.BE_MEMORY],
2944
                         instance.hypervisor)
2945

    
2946
    # check bridge existance
2947
    brlist = [nic.bridge for nic in instance.nics]
2948
    if not self.rpc.call_bridges_exist(target_node, brlist):
2949
      raise errors.OpPrereqError("One or more target bridges %s does not"
2950
                                 " exist on destination node '%s'" %
2951
                                 (brlist, target_node))
2952

    
2953
  def Exec(self, feedback_fn):
2954
    """Failover an instance.
2955

2956
    The failover is done by shutting it down on its present node and
2957
    starting it on the secondary.
2958

2959
    """
2960
    instance = self.instance
2961

    
2962
    source_node = instance.primary_node
2963
    target_node = instance.secondary_nodes[0]
2964

    
2965
    feedback_fn("* checking disk consistency between source and target")
2966
    for dev in instance.disks:
2967
      # for drbd, these are drbd over lvm
2968
      if not _CheckDiskConsistency(self, dev, target_node, False):
2969
        if instance.status == "up" and not self.op.ignore_consistency:
2970
          raise errors.OpExecError("Disk %s is degraded on target node,"
2971
                                   " aborting failover." % dev.iv_name)
2972

    
2973
    feedback_fn("* shutting down instance on source node")
2974
    logging.info("Shutting down instance %s on node %s",
2975
                 instance.name, source_node)
2976

    
2977
    if not self.rpc.call_instance_shutdown(source_node, instance):
2978
      if self.op.ignore_consistency:
2979
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
2980
                             " Proceeding"
2981
                             " anyway. Please make sure node %s is down",
2982
                             instance.name, source_node, source_node)
2983
      else:
2984
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2985
                                 (instance.name, source_node))
2986

    
2987
    feedback_fn("* deactivating the instance's disks on source node")
2988
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2989
      raise errors.OpExecError("Can't shut down the instance's disks.")
2990

    
2991
    instance.primary_node = target_node
2992
    # distribute new instance config to the other nodes
2993
    self.cfg.Update(instance)
2994

    
2995
    # Only start the instance if it's marked as up
2996
    if instance.status == "up":
2997
      feedback_fn("* activating the instance's disks on target node")
2998
      logging.info("Starting instance %s on node %s",
2999
                   instance.name, target_node)
3000

    
3001
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3002
                                               ignore_secondaries=True)
3003
      if not disks_ok:
3004
        _ShutdownInstanceDisks(self, instance)
3005
        raise errors.OpExecError("Can't activate the instance's disks")
3006

    
3007
      feedback_fn("* starting the instance on the target node")
3008
      if not self.rpc.call_instance_start(target_node, instance, None):
3009
        _ShutdownInstanceDisks(self, instance)
3010
        raise errors.OpExecError("Could not start instance %s on node %s." %
3011
                                 (instance.name, target_node))
3012

    
3013

    
3014
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3015
  """Create a tree of block devices on the primary node.
3016

3017
  This always creates all devices.
3018

3019
  """
3020
  if device.children:
3021
    for child in device.children:
3022
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3023
        return False
3024

    
3025
  lu.cfg.SetDiskID(device, node)
3026
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3027
                                       instance.name, True, info)
3028
  if not new_id:
3029
    return False
3030
  if device.physical_id is None:
3031
    device.physical_id = new_id
3032
  return True
3033

    
3034

    
3035
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3036
  """Create a tree of block devices on a secondary node.
3037

3038
  If this device type has to be created on secondaries, create it and
3039
  all its children.
3040

3041
  If not, just recurse to children keeping the same 'force' value.
3042

3043
  """
3044
  if device.CreateOnSecondary():
3045
    force = True
3046
  if device.children:
3047
    for child in device.children:
3048
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3049
                                        child, force, info):
3050
        return False
3051

    
3052
  if not force:
3053
    return True
3054
  lu.cfg.SetDiskID(device, node)
3055
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3056
                                       instance.name, False, info)
3057
  if not new_id:
3058
    return False
3059
  if device.physical_id is None:
3060
    device.physical_id = new_id
3061
  return True
3062

    
3063

    
3064
def _GenerateUniqueNames(lu, exts):
3065
  """Generate a suitable LV name.
3066

3067
  This will generate a logical volume name for the given instance.
3068

3069
  """
3070
  results = []
3071
  for val in exts:
3072
    new_id = lu.cfg.GenerateUniqueID()
3073
    results.append("%s%s" % (new_id, val))
3074
  return results
3075

    
3076

    
3077
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3078
                         p_minor, s_minor):
3079
  """Generate a drbd8 device complete with its children.
3080

3081
  """
3082
  port = lu.cfg.AllocatePort()
3083
  vgname = lu.cfg.GetVGName()
3084
  shared_secret = lu.cfg.GenerateDRBDSecret()
3085
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3086
                          logical_id=(vgname, names[0]))
3087
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3088
                          logical_id=(vgname, names[1]))
3089
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3090
                          logical_id=(primary, secondary, port,
3091
                                      p_minor, s_minor,
3092
                                      shared_secret),
3093
                          children=[dev_data, dev_meta],
3094
                          iv_name=iv_name)
3095
  return drbd_dev
3096

    
3097

    
3098
def _GenerateDiskTemplate(lu, template_name,
3099
                          instance_name, primary_node,
3100
                          secondary_nodes, disk_info,
3101
                          file_storage_dir, file_driver):
3102
  """Generate the entire disk layout for a given template type.
3103

3104
  """
3105
  #TODO: compute space requirements
3106

    
3107
  vgname = lu.cfg.GetVGName()
3108
  disk_count = len(disk_info)
3109
  disks = []
3110
  if template_name == constants.DT_DISKLESS:
3111
    pass
3112
  elif template_name == constants.DT_PLAIN:
3113
    if len(secondary_nodes) != 0:
3114
      raise errors.ProgrammerError("Wrong template configuration")
3115

    
3116
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3117
                                      for i in range(disk_count)])
3118
    for idx, disk in enumerate(disk_info):
3119
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3120
                              logical_id=(vgname, names[idx]),
3121
                              iv_name = "disk/%d" % idx)
3122
      disks.append(disk_dev)
3123
  elif template_name == constants.DT_DRBD8:
3124
    if len(secondary_nodes) != 1:
3125
      raise errors.ProgrammerError("Wrong template configuration")
3126
    remote_node = secondary_nodes[0]
3127
    minors = lu.cfg.AllocateDRBDMinor(
3128
      [primary_node, remote_node] * len(disk_info), instance_name)
3129

    
3130
    names = _GenerateUniqueNames(lu,
3131
                                 [".disk%d_%s" % (i, s)
3132
                                  for i in range(disk_count)
3133
                                  for s in ("data", "meta")
3134
                                  ])
3135
    for idx, disk in enumerate(disk_info):
3136
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3137
                                      disk["size"], names[idx*2:idx*2+2],
3138
                                      "disk/%d" % idx,
3139
                                      minors[idx*2], minors[idx*2+1])
3140
      disks.append(disk_dev)
3141
  elif template_name == constants.DT_FILE:
3142
    if len(secondary_nodes) != 0:
3143
      raise errors.ProgrammerError("Wrong template configuration")
3144

    
3145
    for idx, disk in enumerate(disk_info):
3146

    
3147
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3148
                              iv_name="disk/%d" % idx,
3149
                              logical_id=(file_driver,
3150
                                          "%s/disk%d" % (file_storage_dir,
3151
                                                         idx)))
3152
      disks.append(disk_dev)
3153
  else:
3154
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3155
  return disks
3156

    
3157

    
3158
def _GetInstanceInfoText(instance):
3159
  """Compute that text that should be added to the disk's metadata.
3160

3161
  """
3162
  return "originstname+%s" % instance.name
3163

    
3164

    
3165
def _CreateDisks(lu, instance):
3166
  """Create all disks for an instance.
3167

3168
  This abstracts away some work from AddInstance.
3169

3170
  Args:
3171
    instance: the instance object
3172

3173
  Returns:
3174
    True or False showing the success of the creation process
3175

3176
  """
3177
  info = _GetInstanceInfoText(instance)
3178

    
3179
  if instance.disk_template == constants.DT_FILE:
3180
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3181
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3182
                                                 file_storage_dir)
3183

    
3184
    if not result:
3185
      logging.error("Could not connect to node '%s'", instance.primary_node)
3186
      return False
3187

    
3188
    if not result[0]:
3189
      logging.error("Failed to create directory '%s'", file_storage_dir)
3190
      return False
3191

    
3192
  for device in instance.disks:
3193
    logging.info("Creating volume %s for instance %s",
3194
                 device.iv_name, instance.name)
3195
    #HARDCODE
3196
    for secondary_node in instance.secondary_nodes:
3197
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3198
                                        device, False, info):
3199
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3200
                      device.iv_name, device, secondary_node)
3201
        return False
3202
    #HARDCODE
3203
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3204
                                    instance, device, info):
3205
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3206
      return False
3207

    
3208
  return True
3209

    
3210

    
3211
def _RemoveDisks(lu, instance):
3212
  """Remove all disks for an instance.
3213

3214
  This abstracts away some work from `AddInstance()` and
3215
  `RemoveInstance()`. Note that in case some of the devices couldn't
3216
  be removed, the removal will continue with the other ones (compare
3217
  with `_CreateDisks()`).
3218

3219
  Args:
3220
    instance: the instance object
3221

3222
  Returns:
3223
    True or False showing the success of the removal proces
3224

3225
  """
3226
  logging.info("Removing block devices for instance %s", instance.name)
3227

    
3228
  result = True
3229
  for device in instance.disks:
3230
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3231
      lu.cfg.SetDiskID(disk, node)
3232
      if not lu.rpc.call_blockdev_remove(node, disk):
3233
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3234
                           " continuing anyway", device.iv_name, node)
3235
        result = False
3236

    
3237
  if instance.disk_template == constants.DT_FILE:
3238
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3239
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3240
                                               file_storage_dir):
3241
      logging.error("Could not remove directory '%s'", file_storage_dir)
3242
      result = False
3243

    
3244
  return result
3245

    
3246

    
3247
def _ComputeDiskSize(disk_template, disks):
3248
  """Compute disk size requirements in the volume group
3249

3250
  This is currently hard-coded for the two-drive layout.
3251

3252
  """
3253
  # Required free disk space as a function of disk and swap space
3254
  req_size_dict = {
3255
    constants.DT_DISKLESS: None,
3256
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3257
    # 128 MB are added for drbd metadata for each disk
3258
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3259
    constants.DT_FILE: None,
3260
  }
3261

    
3262
  if disk_template not in req_size_dict:
3263
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3264
                                 " is unknown" %  disk_template)
3265

    
3266
  return req_size_dict[disk_template]
3267

    
3268

    
3269
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3270
  """Hypervisor parameter validation.
3271

3272
  This function abstract the hypervisor parameter validation to be
3273
  used in both instance create and instance modify.
3274

3275
  @type lu: L{LogicalUnit}
3276
  @param lu: the logical unit for which we check
3277
  @type nodenames: list
3278
  @param nodenames: the list of nodes on which we should check
3279
  @type hvname: string
3280
  @param hvname: the name of the hypervisor we should use
3281
  @type hvparams: dict
3282
  @param hvparams: the parameters which we need to check
3283
  @raise errors.OpPrereqError: if the parameters are not valid
3284

3285
  """
3286
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3287
                                                  hvname,
3288
                                                  hvparams)
3289
  for node in nodenames:
3290
    info = hvinfo.get(node, None)
3291
    if not info or not isinstance(info, (tuple, list)):
3292
      raise errors.OpPrereqError("Cannot get current information"
3293
                                 " from node '%s' (%s)" % (node, info))
3294
    if not info[0]:
3295
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3296
                                 " %s" % info[1])
3297

    
3298

    
3299
class LUCreateInstance(LogicalUnit):
3300
  """Create an instance.
3301

3302
  """
3303
  HPATH = "instance-add"
3304
  HTYPE = constants.HTYPE_INSTANCE
3305
  _OP_REQP = ["instance_name", "disks", "disk_template",
3306
              "mode", "start",
3307
              "wait_for_sync", "ip_check", "nics",
3308
              "hvparams", "beparams"]
3309
  REQ_BGL = False
3310

    
3311
  def _ExpandNode(self, node):
3312
    """Expands and checks one node name.
3313

3314
    """
3315
    node_full = self.cfg.ExpandNodeName(node)
3316
    if node_full is None:
3317
      raise errors.OpPrereqError("Unknown node %s" % node)
3318
    return node_full
3319

    
3320
  def ExpandNames(self):
3321
    """ExpandNames for CreateInstance.
3322

3323
    Figure out the right locks for instance creation.
3324

3325
    """
3326
    self.needed_locks = {}
3327

    
3328
    # set optional parameters to none if they don't exist
3329
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3330
      if not hasattr(self.op, attr):
3331
        setattr(self.op, attr, None)
3332

    
3333
    # cheap checks, mostly valid constants given
3334

    
3335
    # verify creation mode
3336
    if self.op.mode not in (constants.INSTANCE_CREATE,
3337
                            constants.INSTANCE_IMPORT):
3338
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3339
                                 self.op.mode)
3340

    
3341
    # disk template and mirror node verification
3342
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3343
      raise errors.OpPrereqError("Invalid disk template name")
3344

    
3345
    if self.op.hypervisor is None:
3346
      self.op.hypervisor = self.cfg.GetHypervisorType()
3347

    
3348
    cluster = self.cfg.GetClusterInfo()
3349
    enabled_hvs = cluster.enabled_hypervisors
3350
    if self.op.hypervisor not in enabled_hvs:
3351
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3352
                                 " cluster (%s)" % (self.op.hypervisor,
3353
                                  ",".join(enabled_hvs)))
3354

    
3355
    # check hypervisor parameter syntax (locally)
3356

    
3357
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3358
                                  self.op.hvparams)
3359
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3360
    hv_type.CheckParameterSyntax(filled_hvp)
3361

    
3362
    # fill and remember the beparams dict
3363
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3364
                                    self.op.beparams)
3365

    
3366
    #### instance parameters check
3367

    
3368
    # instance name verification
3369
    hostname1 = utils.HostInfo(self.op.instance_name)
3370
    self.op.instance_name = instance_name = hostname1.name
3371

    
3372
    # this is just a preventive check, but someone might still add this
3373
    # instance in the meantime, and creation will fail at lock-add time
3374
    if instance_name in self.cfg.GetInstanceList():
3375
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3376
                                 instance_name)
3377

    
3378
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3379

    
3380
    # NIC buildup
3381
    self.nics = []
3382
    for nic in self.op.nics:
3383
      # ip validity checks
3384
      ip = nic.get("ip", None)
3385
      if ip is None or ip.lower() == "none":
3386
        nic_ip = None
3387
      elif ip.lower() == constants.VALUE_AUTO:
3388
        nic_ip = hostname1.ip
3389
      else:
3390
        if not utils.IsValidIP(ip):
3391
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3392
                                     " like a valid IP" % ip)
3393
        nic_ip = ip
3394

    
3395
      # MAC address verification
3396
      mac = nic.get("mac", constants.VALUE_AUTO)
3397
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3398
        if not utils.IsValidMac(mac.lower()):
3399
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3400
                                     mac)
3401
      # bridge verification
3402
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3403
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3404

    
3405
    # disk checks/pre-build
3406
    self.disks = []
3407
    for disk in self.op.disks:
3408
      mode = disk.get("mode", constants.DISK_RDWR)
3409
      if mode not in constants.DISK_ACCESS_SET:
3410
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3411
                                   mode)
3412
      size = disk.get("size", None)
3413
      if size is None:
3414
        raise errors.OpPrereqError("Missing disk size")
3415
      try:
3416
        size = int(size)
3417
      except ValueError:
3418
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3419
      self.disks.append({"size": size, "mode": mode})
3420

    
3421
    # used in CheckPrereq for ip ping check
3422
    self.check_ip = hostname1.ip
3423

    
3424
    # file storage checks
3425
    if (self.op.file_driver and
3426
        not self.op.file_driver in constants.FILE_DRIVER):
3427
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3428
                                 self.op.file_driver)
3429

    
3430
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3431
      raise errors.OpPrereqError("File storage directory path not absolute")
3432

    
3433
    ### Node/iallocator related checks
3434
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3435
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3436
                                 " node must be given")
3437

    
3438
    if self.op.iallocator:
3439
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3440
    else:
3441
      self.op.pnode = self._ExpandNode(self.op.pnode)
3442
      nodelist = [self.op.pnode]
3443
      if self.op.snode is not None:
3444
        self.op.snode = self._ExpandNode(self.op.snode)
3445
        nodelist.append(self.op.snode)
3446
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3447

    
3448
    # in case of import lock the source node too
3449
    if self.op.mode == constants.INSTANCE_IMPORT:
3450
      src_node = getattr(self.op, "src_node", None)
3451
      src_path = getattr(self.op, "src_path", None)
3452

    
3453
      if src_node is None or src_path is None:
3454
        raise errors.OpPrereqError("Importing an instance requires source"
3455
                                   " node and path options")
3456

    
3457
      if not os.path.isabs(src_path):
3458
        raise errors.OpPrereqError("The source path must be absolute")
3459

    
3460
      self.op.src_node = src_node = self._ExpandNode(src_node)
3461
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3462
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3463

    
3464
    else: # INSTANCE_CREATE
3465
      if getattr(self.op, "os_type", None) is None:
3466
        raise errors.OpPrereqError("No guest OS specified")
3467

    
3468
  def _RunAllocator(self):
3469
    """Run the allocator based on input opcode.
3470

3471
    """
3472
    nics = [n.ToDict() for n in self.nics]
3473
    ial = IAllocator(self,
3474
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3475
                     name=self.op.instance_name,
3476
                     disk_template=self.op.disk_template,
3477
                     tags=[],
3478
                     os=self.op.os_type,
3479
                     vcpus=self.be_full[constants.BE_VCPUS],
3480
                     mem_size=self.be_full[constants.BE_MEMORY],
3481
                     disks=self.disks,
3482
                     nics=nics,
3483
                     )
3484

    
3485
    ial.Run(self.op.iallocator)
3486

    
3487
    if not ial.success:
3488
      raise errors.OpPrereqError("Can't compute nodes using"
3489
                                 " iallocator '%s': %s" % (self.op.iallocator,
3490
                                                           ial.info))
3491
    if len(ial.nodes) != ial.required_nodes:
3492
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3493
                                 " of nodes (%s), required %s" %
3494
                                 (self.op.iallocator, len(ial.nodes),
3495
                                  ial.required_nodes))
3496
    self.op.pnode = ial.nodes[0]
3497
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3498
                 self.op.instance_name, self.op.iallocator,
3499
                 ", ".join(ial.nodes))
3500
    if ial.required_nodes == 2:
3501
      self.op.snode = ial.nodes[1]
3502

    
3503
  def BuildHooksEnv(self):
3504
    """Build hooks env.
3505

3506
    This runs on master, primary and secondary nodes of the instance.
3507

3508
    """
3509
    env = {
3510
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3511
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3512
      "INSTANCE_ADD_MODE": self.op.mode,
3513
      }
3514
    if self.op.mode == constants.INSTANCE_IMPORT:
3515
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3516
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3517
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3518

    
3519
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3520
      primary_node=self.op.pnode,
3521
      secondary_nodes=self.secondaries,
3522
      status=self.instance_status,
3523
      os_type=self.op.os_type,
3524
      memory=self.be_full[constants.BE_MEMORY],
3525
      vcpus=self.be_full[constants.BE_VCPUS],
3526
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3527
    ))
3528

    
3529
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3530
          self.secondaries)
3531
    return env, nl, nl
3532

    
3533

    
3534
  def CheckPrereq(self):
3535
    """Check prerequisites.
3536

3537
    """
3538
    if (not self.cfg.GetVGName() and
3539
        self.op.disk_template not in constants.DTS_NOT_LVM):
3540
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3541
                                 " instances")
3542

    
3543

    
3544
    if self.op.mode == constants.INSTANCE_IMPORT:
3545
      src_node = self.op.src_node
3546
      src_path = self.op.src_path
3547

    
3548
      export_info = self.rpc.call_export_info(src_node, src_path)
3549

    
3550
      if not export_info:
3551
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3552

    
3553
      if not export_info.has_section(constants.INISECT_EXP):
3554
        raise errors.ProgrammerError("Corrupted export config")
3555

    
3556
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3557
      if (int(ei_version) != constants.EXPORT_VERSION):
3558
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3559
                                   (ei_version, constants.EXPORT_VERSION))
3560

    
3561
      # Check that the new instance doesn't have less disks than the export
3562
      instance_disks = len(self.disks)
3563
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3564
      if instance_disks < export_disks:
3565
        raise errors.OpPrereqError("Not enough disks to import."
3566
                                   " (instance: %d, export: %d)" %
3567
                                   (2, export_disks))
3568

    
3569
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3570
      disk_images = []
3571
      for idx in range(export_disks):
3572
        option = 'disk%d_dump' % idx
3573
        if export_info.has_option(constants.INISECT_INS, option):
3574
          # FIXME: are the old os-es, disk sizes, etc. useful?
3575
          export_name = export_info.get(constants.INISECT_INS, option)
3576
          image = os.path.join(src_path, export_name)
3577
          disk_images.append(image)
3578
        else:
3579
          disk_images.append(False)
3580

    
3581
      self.src_images = disk_images
3582

    
3583
      if self.op.mac == constants.VALUE_AUTO:
3584
        old_name = export_info.get(constants.INISECT_INS, 'name')
3585
        if self.op.instance_name == old_name:
3586
          # FIXME: adjust every nic, when we'll be able to create instances
3587
          # with more than one
3588
          if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3589
            self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3590

    
3591
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3592

    
3593
    if self.op.start and not self.op.ip_check:
3594
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3595
                                 " adding an instance in start mode")
3596

    
3597
    if self.op.ip_check:
3598
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3599
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3600
                                   (self.check_ip, self.op.instance_name))
3601

    
3602
    #### allocator run
3603

    
3604
    if self.op.iallocator is not None:
3605
      self._RunAllocator()
3606

    
3607
    #### node related checks
3608

    
3609
    # check primary node
3610
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3611
    assert self.pnode is not None, \
3612
      "Cannot retrieve locked node %s" % self.op.pnode
3613
    self.secondaries = []
3614

    
3615
    # mirror node verification
3616
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3617
      if self.op.snode is None:
3618
        raise errors.OpPrereqError("The networked disk templates need"
3619
                                   " a mirror node")
3620
      if self.op.snode == pnode.name:
3621
        raise errors.OpPrereqError("The secondary node cannot be"
3622
                                   " the primary node.")
3623
      self.secondaries.append(self.op.snode)
3624

    
3625
    nodenames = [pnode.name] + self.secondaries
3626

    
3627
    req_size = _ComputeDiskSize(self.op.disk_template,
3628
                                self.disks)
3629

    
3630
    # Check lv size requirements
3631
    if req_size is not None:
3632
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3633
                                         self.op.hypervisor)
3634
      for node in nodenames:
3635
        info = nodeinfo.get(node, None)
3636
        if not info:
3637
          raise errors.OpPrereqError("Cannot get current information"
3638
                                     " from node '%s'" % node)
3639
        vg_free = info.get('vg_free', None)
3640
        if not isinstance(vg_free, int):
3641
          raise errors.OpPrereqError("Can't compute free disk space on"
3642
                                     " node %s" % node)
3643
        if req_size > info['vg_free']:
3644
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3645
                                     " %d MB available, %d MB required" %
3646
                                     (node, info['vg_free'], req_size))
3647

    
3648
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3649

    
3650
    # os verification
3651
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3652
    if not os_obj:
3653
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3654
                                 " primary node"  % self.op.os_type)
3655

    
3656
    # bridge check on primary node
3657
    bridges = [n.bridge for n in self.nics]
3658
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3659
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3660
                                 " exist on"
3661
                                 " destination node '%s'" %
3662
                                 (",".join(bridges), pnode.name))
3663

    
3664
    # memory check on primary node
3665
    if self.op.start:
3666
      _CheckNodeFreeMemory(self, self.pnode.name,
3667
                           "creating instance %s" % self.op.instance_name,
3668
                           self.be_full[constants.BE_MEMORY],
3669
                           self.op.hypervisor)
3670

    
3671
    if self.op.start:
3672
      self.instance_status = 'up'
3673
    else:
3674
      self.instance_status = 'down'
3675

    
3676
  def Exec(self, feedback_fn):
3677
    """Create and add the instance to the cluster.
3678

3679
    """
3680
    instance = self.op.instance_name
3681
    pnode_name = self.pnode.name
3682

    
3683
    for nic in self.nics:
3684
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3685
        nic.mac = self.cfg.GenerateMAC()
3686

    
3687
    ht_kind = self.op.hypervisor
3688
    if ht_kind in constants.HTS_REQ_PORT:
3689
      network_port = self.cfg.AllocatePort()
3690
    else:
3691
      network_port = None
3692

    
3693
    ##if self.op.vnc_bind_address is None:
3694
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3695

    
3696
    # this is needed because os.path.join does not accept None arguments
3697
    if self.op.file_storage_dir is None:
3698
      string_file_storage_dir = ""
3699
    else:
3700
      string_file_storage_dir = self.op.file_storage_dir
3701

    
3702
    # build the full file storage dir path
3703
    file_storage_dir = os.path.normpath(os.path.join(
3704
                                        self.cfg.GetFileStorageDir(),
3705
                                        string_file_storage_dir, instance))
3706

    
3707

    
3708
    disks = _GenerateDiskTemplate(self,
3709
                                  self.op.disk_template,
3710
                                  instance, pnode_name,
3711
                                  self.secondaries,
3712
                                  self.disks,
3713
                                  file_storage_dir,
3714
                                  self.op.file_driver)
3715

    
3716
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3717
                            primary_node=pnode_name,
3718
                            nics=self.nics, disks=disks,
3719
                            disk_template=self.op.disk_template,
3720
                            status=self.instance_status,
3721
                            network_port=network_port,
3722
                            beparams=self.op.beparams,
3723
                            hvparams=self.op.hvparams,
3724
                            hypervisor=self.op.hypervisor,
3725
                            )
3726

    
3727
    feedback_fn("* creating instance disks...")
3728
    if not _CreateDisks(self, iobj):
3729
      _RemoveDisks(self, iobj)
3730
      self.cfg.ReleaseDRBDMinors(instance)
3731
      raise errors.OpExecError("Device creation failed, reverting...")
3732

    
3733
    feedback_fn("adding instance %s to cluster config" % instance)
3734

    
3735
    self.cfg.AddInstance(iobj)
3736
    # Declare that we don't want to remove the instance lock anymore, as we've
3737
    # added the instance to the config
3738
    del self.remove_locks[locking.LEVEL_INSTANCE]
3739
    # Remove the temp. assignements for the instance's drbds
3740
    self.cfg.ReleaseDRBDMinors(instance)
3741

    
3742
    if self.op.wait_for_sync:
3743
      disk_abort = not _WaitForSync(self, iobj)
3744
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3745
      # make sure the disks are not degraded (still sync-ing is ok)
3746
      time.sleep(15)
3747
      feedback_fn("* checking mirrors status")
3748
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3749
    else:
3750
      disk_abort = False
3751

    
3752
    if disk_abort:
3753
      _RemoveDisks(self, iobj)
3754
      self.cfg.RemoveInstance(iobj.name)
3755
      # Make sure the instance lock gets removed
3756
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3757
      raise errors.OpExecError("There are some degraded disks for"
3758
                               " this instance")
3759

    
3760
    feedback_fn("creating os for instance %s on node %s" %
3761
                (instance, pnode_name))
3762

    
3763
    if iobj.disk_template != constants.DT_DISKLESS:
3764
      if self.op.mode == constants.INSTANCE_CREATE:
3765
        feedback_fn("* running the instance OS create scripts...")
3766
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3767
          raise errors.OpExecError("could not add os for instance %s"
3768
                                   " on node %s" %
3769
                                   (instance, pnode_name))
3770

    
3771
      elif self.op.mode == constants.INSTANCE_IMPORT:
3772
        feedback_fn("* running the instance OS import scripts...")
3773
        src_node = self.op.src_node
3774
        src_images = self.src_images
3775
        cluster_name = self.cfg.GetClusterName()
3776
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3777
                                                         src_node, src_images,
3778
                                                         cluster_name)
3779
        for idx, result in enumerate(import_result):
3780
          if not result:
3781
            self.LogWarning("Could not image %s for on instance %s, disk %d,"
3782
                            " on node %s" % (src_images[idx], instance, idx,
3783
                                             pnode_name))
3784
      else:
3785
        # also checked in the prereq part
3786
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3787
                                     % self.op.mode)
3788

    
3789
    if self.op.start:
3790
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3791
      feedback_fn("* starting instance...")
3792
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3793
        raise errors.OpExecError("Could not start instance")
3794

    
3795

    
3796
class LUConnectConsole(NoHooksLU):
3797
  """Connect to an instance's console.
3798

3799
  This is somewhat special in that it returns the command line that
3800
  you need to run on the master node in order to connect to the
3801
  console.
3802

3803
  """
3804
  _OP_REQP = ["instance_name"]
3805
  REQ_BGL = False
3806

    
3807
  def ExpandNames(self):
3808
    self._ExpandAndLockInstance()
3809

    
3810
  def CheckPrereq(self):
3811
    """Check prerequisites.
3812

3813
    This checks that the instance is in the cluster.
3814

3815
    """
3816
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3817
    assert self.instance is not None, \
3818
      "Cannot retrieve locked instance %s" % self.op.instance_name
3819

    
3820
  def Exec(self, feedback_fn):
3821
    """Connect to the console of an instance
3822

3823
    """
3824
    instance = self.instance
3825
    node = instance.primary_node
3826

    
3827
    node_insts = self.rpc.call_instance_list([node],
3828
                                             [instance.hypervisor])[node]
3829
    if node_insts is False:
3830
      raise errors.OpExecError("Can't connect to node %s." % node)
3831

    
3832
    if instance.name not in node_insts:
3833
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3834

    
3835
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3836

    
3837
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3838
    console_cmd = hyper.GetShellCommandForConsole(instance)
3839

    
3840
    # build ssh cmdline
3841
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3842

    
3843

    
3844
class LUReplaceDisks(LogicalUnit):
3845
  """Replace the disks of an instance.
3846

3847
  """
3848
  HPATH = "mirrors-replace"
3849
  HTYPE = constants.HTYPE_INSTANCE
3850
  _OP_REQP = ["instance_name", "mode", "disks"]
3851
  REQ_BGL = False
3852

    
3853
  def ExpandNames(self):
3854
    self._ExpandAndLockInstance()
3855

    
3856
    if not hasattr(self.op, "remote_node"):
3857
      self.op.remote_node = None
3858

    
3859
    ia_name = getattr(self.op, "iallocator", None)
3860
    if ia_name is not None:
3861
      if self.op.remote_node is not None:
3862
        raise errors.OpPrereqError("Give either the iallocator or the new"
3863
                                   " secondary, not both")
3864
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3865
    elif self.op.remote_node is not None:
3866
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3867
      if remote_node is None:
3868
        raise errors.OpPrereqError("Node '%s' not known" %
3869
                                   self.op.remote_node)
3870
      self.op.remote_node = remote_node
3871
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3872
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3873
    else:
3874
      self.needed_locks[locking.LEVEL_NODE] = []
3875
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3876

    
3877
  def DeclareLocks(self, level):
3878
    # If we're not already locking all nodes in the set we have to declare the
3879
    # instance's primary/secondary nodes.
3880
    if (level == locking.LEVEL_NODE and
3881
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3882
      self._LockInstancesNodes()
3883

    
3884
  def _RunAllocator(self):
3885
    """Compute a new secondary node using an IAllocator.
3886

3887
    """
3888
    ial = IAllocator(self,
3889
                     mode=constants.IALLOCATOR_MODE_RELOC,
3890
                     name=self.op.instance_name,
3891
                     relocate_from=[self.sec_node])
3892

    
3893
    ial.Run(self.op.iallocator)
3894

    
3895
    if not ial.success:
3896
      raise errors.OpPrereqError("Can't compute nodes using"
3897
                                 " iallocator '%s': %s" % (self.op.iallocator,
3898
                                                           ial.info))
3899
    if len(ial.nodes) != ial.required_nodes:
3900
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3901
                                 " of nodes (%s), required %s" %
3902
                                 (len(ial.nodes), ial.required_nodes))
3903
    self.op.remote_node = ial.nodes[0]
3904
    self.LogInfo("Selected new secondary for the instance: %s",
3905
                 self.op.remote_node)
3906

    
3907
  def BuildHooksEnv(self):
3908
    """Build hooks env.
3909

3910
    This runs on the master, the primary and all the secondaries.
3911

3912
    """
3913
    env = {
3914
      "MODE": self.op.mode,
3915
      "NEW_SECONDARY": self.op.remote_node,
3916
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3917
      }
3918
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3919
    nl = [
3920
      self.cfg.GetMasterNode(),
3921
      self.instance.primary_node,
3922
      ]
3923
    if self.op.remote_node is not None:
3924
      nl.append(self.op.remote_node)
3925
    return env, nl, nl
3926

    
3927
  def CheckPrereq(self):
3928
    """Check prerequisites.
3929

3930
    This checks that the instance is in the cluster.
3931

3932
    """
3933
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3934
    assert instance is not None, \
3935
      "Cannot retrieve locked instance %s" % self.op.instance_name
3936
    self.instance = instance
3937

    
3938
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3939
      raise errors.OpPrereqError("Instance's disk layout is not"
3940
                                 " network mirrored.")
3941

    
3942
    if len(instance.secondary_nodes) != 1:
3943
      raise errors.OpPrereqError("The instance has a strange layout,"
3944
                                 " expected one secondary but found %d" %
3945
                                 len(instance.secondary_nodes))
3946

    
3947
    self.sec_node = instance.secondary_nodes[0]
3948

    
3949
    ia_name = getattr(self.op, "iallocator", None)
3950
    if ia_name is not None:
3951
      self._RunAllocator()
3952

    
3953
    remote_node = self.op.remote_node
3954
    if remote_node is not None:
3955
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3956
      assert self.remote_node_info is not None, \
3957
        "Cannot retrieve locked node %s" % remote_node
3958
    else:
3959
      self.remote_node_info = None
3960
    if remote_node == instance.primary_node:
3961
      raise errors.OpPrereqError("The specified node is the primary node of"
3962
                                 " the instance.")
3963
    elif remote_node == self.sec_node:
3964
      if self.op.mode == constants.REPLACE_DISK_SEC:
3965
        # this is for DRBD8, where we can't execute the same mode of
3966
        # replacement as for drbd7 (no different port allocated)
3967
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3968
                                   " replacement")
3969
    if instance.disk_template == constants.DT_DRBD8:
3970
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3971
          remote_node is not None):
3972
        # switch to replace secondary mode
3973
        self.op.mode = constants.REPLACE_DISK_SEC
3974

    
3975
      if self.op.mode == constants.REPLACE_DISK_ALL:
3976
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3977
                                   " secondary disk replacement, not"
3978
                                   " both at once")
3979
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3980
        if remote_node is not None:
3981
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3982
                                     " the secondary while doing a primary"
3983
                                     " node disk replacement")
3984
        self.tgt_node = instance.primary_node
3985
        self.oth_node = instance.secondary_nodes[0]
3986
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3987
        self.new_node = remote_node # this can be None, in which case
3988
                                    # we don't change the secondary
3989
        self.tgt_node = instance.secondary_nodes[0]
3990
        self.oth_node = instance.primary_node
3991
      else:
3992
        raise errors.ProgrammerError("Unhandled disk replace mode")
3993

    
3994
    if not self.op.disks:
3995
      self.op.disks = range(len(instance.disks))
3996

    
3997
    for disk_idx in self.op.disks:
3998
      instance.FindDisk(disk_idx)
3999

    
4000
  def _ExecD8DiskOnly(self, feedback_fn):
4001
    """Replace a disk on the primary or secondary for dbrd8.
4002

4003
    The algorithm for replace is quite complicated:
4004
      - for each disk to be replaced:
4005
        - create new LVs on the target node with unique names
4006
        - detach old LVs from the drbd device
4007
        - rename old LVs to name_replaced.<time_t>
4008
        - rename new LVs to old LVs
4009
        - attach the new LVs (with the old names now) to the drbd device
4010
      - wait for sync across all devices
4011
      - for each modified disk:
4012
        - remove old LVs (which have the name name_replaces.<time_t>)
4013

4014
    Failures are not very well handled.
4015

4016
    """
4017
    steps_total = 6
4018
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4019
    instance = self.instance
4020
    iv_names = {}
4021
    vgname = self.cfg.GetVGName()
4022
    # start of work
4023
    cfg = self.cfg
4024
    tgt_node = self.tgt_node
4025
    oth_node = self.oth_node
4026

    
4027
    # Step: check device activation
4028
    self.proc.LogStep(1, steps_total, "check device existence")
4029
    info("checking volume groups")
4030
    my_vg = cfg.GetVGName()
4031
    results = self.rpc.call_vg_list([oth_node, tgt_node])
4032
    if not results:
4033
      raise errors.OpExecError("Can't list volume groups on the nodes")
4034
    for node in oth_node, tgt_node:
4035
      res = results.get(node, False)
4036
      if not res or my_vg not in res:
4037
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4038
                                 (my_vg, node))
4039
    for idx, dev in enumerate(instance.disks):
4040
      if idx not in self.op.disks:
4041
        continue
4042
      for node in tgt_node, oth_node:
4043
        info("checking disk/%d on %s" % (idx, node))
4044
        cfg.SetDiskID(dev, node)
4045
        if not self.rpc.call_blockdev_find(node, dev):
4046
          raise errors.OpExecError("Can't find disk/%d on node %s" %
4047
                                   (idx, node))
4048

    
4049
    # Step: check other node consistency
4050
    self.proc.LogStep(2, steps_total, "check peer consistency")
4051
    for idx, dev in enumerate(instance.disks):
4052
      if idx not in self.op.disks:
4053
        continue
4054
      info("checking disk/%d consistency on %s" % (idx, oth_node))
4055
      if not _CheckDiskConsistency(self, dev, oth_node,
4056
                                   oth_node==instance.primary_node):
4057
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4058
                                 " to replace disks on this node (%s)" %
4059
                                 (oth_node, tgt_node))
4060

    
4061
    # Step: create new storage
4062
    self.proc.LogStep(3, steps_total, "allocate new storage")
4063
    for idx, dev in enumerate(instance.disks):
4064
      if idx not in self.op.disks:
4065
        continue
4066
      size = dev.size
4067
      cfg.SetDiskID(dev, tgt_node)
4068
      lv_names = [".disk%d_%s" % (idx, suf)
4069
                  for suf in ["data", "meta"]]
4070
      names = _GenerateUniqueNames(self, lv_names)
4071
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4072
                             logical_id=(vgname, names[0]))
4073
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4074
                             logical_id=(vgname, names[1]))
4075
      new_lvs = [lv_data, lv_meta]
4076
      old_lvs = dev.children
4077
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4078
      info("creating new local storage on %s for %s" %
4079
           (tgt_node, dev.iv_name))
4080
      # since we *always* want to create this LV, we use the
4081
      # _Create...OnPrimary (which forces the creation), even if we
4082
      # are talking about the secondary node
4083
      for new_lv in new_lvs:
4084
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4085
                                        _GetInstanceInfoText(instance)):
4086
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4087
                                   " node '%s'" %
4088
                                   (new_lv.logical_id[1], tgt_node))
4089

    
4090
    # Step: for each lv, detach+rename*2+attach
4091
    self.proc.LogStep(4, steps_total, "change drbd configuration")
4092
    for dev, old_lvs, new_lvs in iv_names.itervalues():
4093
      info("detaching %s drbd from local storage" % dev.iv_name)
4094
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4095
        raise errors.OpExecError("Can't detach drbd from local storage on node"
4096
                                 " %s for device %s" % (tgt_node, dev.iv_name))
4097
      #dev.children = []
4098
      #cfg.Update(instance)
4099

    
4100
      # ok, we created the new LVs, so now we know we have the needed
4101
      # storage; as such, we proceed on the target node to rename
4102
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4103
      # using the assumption that logical_id == physical_id (which in
4104
      # turn is the unique_id on that node)
4105

    
4106
      # FIXME(iustin): use a better name for the replaced LVs
4107
      temp_suffix = int(time.time())
4108
      ren_fn = lambda d, suff: (d.physical_id[0],
4109
                                d.physical_id[1] + "_replaced-%s" % suff)
4110
      # build the rename list based on what LVs exist on the node
4111
      rlist = []
4112
      for to_ren in old_lvs:
4113
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4114
        if find_res is not None: # device exists
4115
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4116

    
4117
      info("renaming the old LVs on the target node")
4118
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4119
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4120
      # now we rename the new LVs to the old LVs
4121
      info("renaming the new LVs on the target node")
4122
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4123
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4124
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4125

    
4126
      for old, new in zip(old_lvs, new_lvs):
4127
        new.logical_id = old.logical_id
4128
        cfg.SetDiskID(new, tgt_node)
4129

    
4130
      for disk in old_lvs:
4131
        disk.logical_id = ren_fn(disk, temp_suffix)
4132
        cfg.SetDiskID(disk, tgt_node)
4133

    
4134
      # now that the new lvs have the old name, we can add them to the device
4135
      info("adding new mirror component on %s" % tgt_node)
4136
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4137
        for new_lv in new_lvs:
4138
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4139
            warning("Can't rollback device %s", hint="manually cleanup unused"
4140
                    " logical volumes")
4141
        raise errors.OpExecError("Can't add local storage to drbd")
4142

    
4143
      dev.children = new_lvs
4144
      cfg.Update(instance)
4145

    
4146
    # Step: wait for sync
4147

    
4148
    # this can fail as the old devices are degraded and _WaitForSync
4149
    # does a combined result over all disks, so we don't check its
4150
    # return value
4151
    self.proc.LogStep(5, steps_total, "sync devices")
4152
    _WaitForSync(self, instance, unlock=True)
4153

    
4154
    # so check manually all the devices
4155
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4156
      cfg.SetDiskID(dev, instance.primary_node)
4157
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4158
      if is_degr:
4159
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4160

    
4161
    # Step: remove old storage
4162
    self.proc.LogStep(6, steps_total, "removing old storage")
4163
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4164
      info("remove logical volumes for %s" % name)
4165
      for lv in old_lvs:
4166
        cfg.SetDiskID(lv, tgt_node)
4167
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4168
          warning("Can't remove old LV", hint="manually remove unused LVs")
4169
          continue
4170

    
4171
  def _ExecD8Secondary(self, feedback_fn):
4172
    """Replace the secondary node for drbd8.
4173

4174
    The algorithm for replace is quite complicated:
4175
      - for all disks of the instance:
4176
        - create new LVs on the new node with same names
4177
        - shutdown the drbd device on the old secondary
4178
        - disconnect the drbd network on the primary
4179
        - create the drbd device on the new secondary
4180
        - network attach the drbd on the primary, using an artifice:
4181
          the drbd code for Attach() will connect to the network if it
4182
          finds a device which is connected to the good local disks but
4183
          not network enabled
4184
      - wait for sync across all devices
4185
      - remove all disks from the old secondary
4186

4187
    Failures are not very well handled.
4188

4189
    """
4190
    steps_total = 6
4191
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4192
    instance = self.instance
4193
    iv_names = {}
4194
    vgname = self.cfg.GetVGName()
4195
    # start of work
4196
    cfg = self.cfg
4197
    old_node = self.tgt_node
4198
    new_node = self.new_node
4199
    pri_node = instance.primary_node
4200

    
4201
    # Step: check device activation
4202
    self.proc.LogStep(1, steps_total, "check device existence")
4203
    info("checking volume groups")
4204
    my_vg = cfg.GetVGName()
4205
    results = self.rpc.call_vg_list([pri_node, new_node])
4206
    if not results:
4207
      raise errors.OpExecError("Can't list volume groups on the nodes")
4208
    for node in pri_node, new_node:
4209
      res = results.get(node, False)
4210
      if not res or my_vg not in res:
4211
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4212
                                 (my_vg, node))
4213
    for idx, dev in enumerate(instance.disks):
4214
      if idx not in self.op.disks:
4215
        continue
4216
      info("checking disk/%d on %s" % (idx, pri_node))
4217
      cfg.SetDiskID(dev, pri_node)
4218
      if not self.rpc.call_blockdev_find(pri_node, dev):
4219
        raise errors.OpExecError("Can't find disk/%d on node %s" %
4220
                                 (idx, pri_node))
4221

    
4222
    # Step: check other node consistency
4223
    self.proc.LogStep(2, steps_total, "check peer consistency")
4224
    for idx, dev in enumerate(instance.disks):
4225
      if idx not in self.op.disks:
4226
        continue
4227
      info("checking disk/%d consistency on %s" % (idx, pri_node))
4228
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4229
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4230
                                 " unsafe to replace the secondary" %
4231
                                 pri_node)
4232

    
4233
    # Step: create new storage
4234
    self.proc.LogStep(3, steps_total, "allocate new storage")
4235
    for idx, dev in enumerate(instance.disks):
4236
      size = dev.size
4237
      info("adding new local storage on %s for disk/%d" %
4238
           (new_node, idx))
4239
      # since we *always* want to create this LV, we use the
4240
      # _Create...OnPrimary (which forces the creation), even if we
4241
      # are talking about the secondary node
4242
      for new_lv in dev.children:
4243
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4244
                                        _GetInstanceInfoText(instance)):
4245
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4246
                                   " node '%s'" %
4247
                                   (new_lv.logical_id[1], new_node))
4248

    
4249
    # Step 4: dbrd minors and drbd setups changes
4250
    # after this, we must manually remove the drbd minors on both the
4251
    # error and the success paths
4252
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4253
                                   instance.name)
4254
    logging.debug("Allocated minors %s" % (minors,))
4255
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4256
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4257
      size = dev.size
4258
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4259
      # create new devices on new_node
4260
      if pri_node == dev.logical_id[0]:
4261
        new_logical_id = (pri_node, new_node,
4262
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4263
                          dev.logical_id[5])
4264
      else:
4265
        new_logical_id = (new_node, pri_node,
4266
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4267
                          dev.logical_id[5])
4268
      iv_names[idx] = (dev, dev.children, new_logical_id)
4269
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4270
                    new_logical_id)
4271
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4272
                              logical_id=new_logical_id,
4273
                              children=dev.children)
4274
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4275
                                        new_drbd, False,
4276
                                        _GetInstanceInfoText(instance)):
4277
        self.cfg.ReleaseDRBDMinors(instance.name)
4278
        raise errors.OpExecError("Failed to create new DRBD on"
4279
                                 " node '%s'" % new_node)
4280

    
4281
    for idx, dev in enumerate(instance.disks):
4282
      # we have new devices, shutdown the drbd on the old secondary
4283
      info("shutting down drbd for disk/%d on old node" % idx)
4284
      cfg.SetDiskID(dev, old_node)
4285
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4286
        warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4287
                hint="Please cleanup this device manually as soon as possible")
4288

    
4289
    info("detaching primary drbds from the network (=> standalone)")
4290
    done = 0
4291
    for idx, dev in enumerate(instance.disks):
4292
      cfg.SetDiskID(dev, pri_node)
4293
      # set the network part of the physical (unique in bdev terms) id
4294
      # to None, meaning detach from network
4295
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4296
      # and 'find' the device, which will 'fix' it to match the
4297
      # standalone state
4298
      if self.rpc.call_blockdev_find(pri_node, dev):
4299
        done += 1
4300
      else:
4301
        warning("Failed to detach drbd disk/%d from network, unusual case" %
4302
                idx)
4303

    
4304
    if not done:
4305
      # no detaches succeeded (very unlikely)
4306
      self.cfg.ReleaseDRBDMinors(instance.name)
4307
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4308

    
4309
    # if we managed to detach at least one, we update all the disks of
4310
    # the instance to point to the new secondary
4311
    info("updating instance configuration")
4312
    for dev, _, new_logical_id in iv_names.itervalues():
4313
      dev.logical_id = new_logical_id
4314
      cfg.SetDiskID(dev, pri_node)
4315
    cfg.Update(instance)
4316
    # we can remove now the temp minors as now the new values are
4317
    # written to the config file (and therefore stable)
4318
    self.cfg.ReleaseDRBDMinors(instance.name)
4319

    
4320
    # and now perform the drbd attach
4321
    info("attaching primary drbds to new secondary (standalone => connected)")
4322
    failures = []
4323
    for idx, dev in enumerate(instance.disks):
4324
      info("attaching primary drbd for disk/%d to new secondary node" % idx)
4325
      # since the attach is smart, it's enough to 'find' the device,
4326
      # it will automatically activate the network, if the physical_id
4327
      # is correct
4328
      cfg.SetDiskID(dev, pri_node)
4329
      logging.debug("Disk to attach: %s", dev)
4330
      if not self.rpc.call_blockdev_find(pri_node, dev):
4331
        warning("can't attach drbd disk/%d to new secondary!" % idx,
4332
                "please do a gnt-instance info to see the status of disks")
4333

    
4334
    # this can fail as the old devices are degraded and _WaitForSync
4335
    # does a combined result over all disks, so we don't check its
4336
    # return value
4337
    self.proc.LogStep(5, steps_total, "sync devices")
4338
    _WaitForSync(self, instance, unlock=True)
4339

    
4340
    # so check manually all the devices
4341
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4342
      cfg.SetDiskID(dev, pri_node)
4343
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4344
      if is_degr:
4345
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4346

    
4347
    self.proc.LogStep(6, steps_total, "removing old storage")
4348
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
4349
      info("remove logical volumes for disk/%d" % idx)
4350
      for lv in old_lvs:
4351
        cfg.SetDiskID(lv, old_node)
4352
        if not self.rpc.call_blockdev_remove(old_node, lv):
4353
          warning("Can't remove LV on old secondary",
4354
                  hint="Cleanup stale volumes by hand")
4355

    
4356
  def Exec(self, feedback_fn):
4357
    """Execute disk replacement.
4358

4359
    This dispatches the disk replacement to the appropriate handler.
4360

4361
    """
4362
    instance = self.instance
4363

    
4364
    # Activate the instance disks if we're replacing them on a down instance
4365
    if instance.status == "down":
4366
      _StartInstanceDisks(self, instance, True)
4367

    
4368
    if instance.disk_template == constants.DT_DRBD8:
4369
      if self.op.remote_node is None:
4370
        fn = self._ExecD8DiskOnly
4371
      else:
4372
        fn = self._ExecD8Secondary
4373
    else:
4374
      raise errors.ProgrammerError("Unhandled disk replacement case")
4375

    
4376
    ret = fn(feedback_fn)
4377

    
4378
    # Deactivate the instance disks if we're replacing them on a down instance
4379
    if instance.status == "down":
4380
      _SafeShutdownInstanceDisks(self, instance)
4381

    
4382
    return ret
4383

    
4384

    
4385
class LUGrowDisk(LogicalUnit):
4386
  """Grow a disk of an instance.
4387

4388
  """
4389
  HPATH = "disk-grow"
4390
  HTYPE = constants.HTYPE_INSTANCE
4391
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4392
  REQ_BGL = False
4393

    
4394
  def ExpandNames(self):
4395
    self._ExpandAndLockInstance()
4396
    self.needed_locks[locking.LEVEL_NODE] = []
4397
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4398

    
4399
  def DeclareLocks(self, level):
4400
    if level == locking.LEVEL_NODE:
4401
      self._LockInstancesNodes()
4402

    
4403
  def BuildHooksEnv(self):
4404
    """Build hooks env.
4405

4406
    This runs on the master, the primary and all the secondaries.
4407

4408
    """
4409
    env = {
4410
      "DISK": self.op.disk,
4411
      "AMOUNT": self.op.amount,
4412
      }
4413
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4414
    nl = [
4415
      self.cfg.GetMasterNode(),
4416
      self.instance.primary_node,
4417
      ]
4418
    return env, nl, nl
4419

    
4420
  def CheckPrereq(self):
4421
    """Check prerequisites.
4422

4423
    This checks that the instance is in the cluster.
4424

4425
    """
4426
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4427
    assert instance is not None, \
4428
      "Cannot retrieve locked instance %s" % self.op.instance_name
4429

    
4430
    self.instance = instance
4431

    
4432
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4433
      raise errors.OpPrereqError("Instance's disk layout does not support"
4434
                                 " growing.")
4435

    
4436
    self.disk = instance.FindDisk(self.op.disk)
4437

    
4438
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4439
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4440
                                       instance.hypervisor)
4441
    for node in nodenames:
4442
      info = nodeinfo.get(node, None)
4443
      if not info:
4444
        raise errors.OpPrereqError("Cannot get current information"
4445
                                   " from node '%s'" % node)
4446
      vg_free = info.get('vg_free', None)
4447
      if not isinstance(vg_free, int):
4448
        raise errors.OpPrereqError("Can't compute free disk space on"
4449
                                   " node %s" % node)
4450
      if self.op.amount > info['vg_free']:
4451
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4452
                                   " %d MiB available, %d MiB required" %
4453
                                   (node, info['vg_free'], self.op.amount))
4454

    
4455
  def Exec(self, feedback_fn):
4456
    """Execute disk grow.
4457

4458
    """
4459
    instance = self.instance
4460
    disk = self.disk
4461
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4462
      self.cfg.SetDiskID(disk, node)
4463
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4464
      if (not result or not isinstance(result, (list, tuple)) or
4465
          len(result) != 2):
4466
        raise errors.OpExecError("grow request failed to node %s" % node)
4467
      elif not result[0]:
4468
        raise errors.OpExecError("grow request failed to node %s: %s" %
4469
                                 (node, result[1]))
4470
    disk.RecordGrow(self.op.amount)
4471
    self.cfg.Update(instance)
4472
    if self.op.wait_for_sync:
4473
      disk_abort = not _WaitForSync(self, instance)
4474
      if disk_abort:
4475
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4476
                             " status.\nPlease check the instance.")
4477

    
4478

    
4479
class LUQueryInstanceData(NoHooksLU):
4480
  """Query runtime instance data.
4481

4482
  """
4483
  _OP_REQP = ["instances", "static"]
4484
  REQ_BGL = False
4485

    
4486
  def ExpandNames(self):
4487
    self.needed_locks = {}
4488
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4489

    
4490
    if not isinstance(self.op.instances, list):
4491
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4492

    
4493
    if self.op.instances:
4494
      self.wanted_names = []
4495
      for name in self.op.instances:
4496
        full_name = self.cfg.ExpandInstanceName(name)
4497
        if full_name is None:
4498
          raise errors.OpPrereqError("Instance '%s' not known" %
4499
                                     self.op.instance_name)
4500
        self.wanted_names.append(full_name)
4501
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4502
    else:
4503
      self.wanted_names = None
4504
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4505

    
4506
    self.needed_locks[locking.LEVEL_NODE] = []
4507
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4508

    
4509
  def DeclareLocks(self, level):
4510
    if level == locking.LEVEL_NODE:
4511
      self._LockInstancesNodes()
4512

    
4513
  def CheckPrereq(self):
4514
    """Check prerequisites.
4515

4516
    This only checks the optional instance list against the existing names.
4517

4518
    """
4519
    if self.wanted_names is None:
4520
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4521

    
4522
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4523
                             in self.wanted_names]
4524
    return
4525

    
4526
  def _ComputeDiskStatus(self, instance, snode, dev):
4527
    """Compute block device status.
4528

4529
    """
4530
    static = self.op.static
4531
    if not static:
4532
      self.cfg.SetDiskID(dev, instance.primary_node)
4533
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4534
    else:
4535
      dev_pstatus = None
4536

    
4537
    if dev.dev_type in constants.LDS_DRBD:
4538
      # we change the snode then (otherwise we use the one passed in)
4539
      if dev.logical_id[0] == instance.primary_node:
4540
        snode = dev.logical_id[1]
4541
      else:
4542
        snode = dev.logical_id[0]
4543

    
4544
    if snode and not static:
4545
      self.cfg.SetDiskID(dev, snode)
4546
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4547
    else:
4548
      dev_sstatus = None
4549

    
4550
    if dev.children:
4551
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4552
                      for child in dev.children]
4553
    else:
4554
      dev_children = []
4555

    
4556
    data = {
4557
      "iv_name": dev.iv_name,
4558
      "dev_type": dev.dev_type,
4559
      "logical_id": dev.logical_id,
4560
      "physical_id": dev.physical_id,
4561
      "pstatus": dev_pstatus,
4562
      "sstatus": dev_sstatus,
4563
      "children": dev_children,
4564
      }
4565

    
4566
    return data
4567

    
4568
  def Exec(self, feedback_fn):
4569
    """Gather and return data"""
4570
    result = {}
4571

    
4572
    cluster = self.cfg.GetClusterInfo()
4573

    
4574
    for instance in self.wanted_instances:
4575
      if not self.op.static:
4576
        remote_info = self.rpc.call_instance_info(instance.primary_node,
4577
                                                  instance.name,
4578
                                                  instance.hypervisor)
4579
        if remote_info and "state" in remote_info:
4580
          remote_state = "up"
4581
        else:
4582
          remote_state = "down"
4583
      else:
4584
        remote_state = None
4585
      if instance.status == "down":
4586
        config_state = "down"
4587
      else:
4588
        config_state = "up"
4589

    
4590
      disks = [self._ComputeDiskStatus(instance, None, device)
4591
               for device in instance.disks]
4592

    
4593
      idict = {
4594
        "name": instance.name,
4595
        "config_state": config_state,
4596
        "run_state": remote_state,
4597
        "pnode": instance.primary_node,
4598
        "snodes": instance.secondary_nodes,
4599
        "os": instance.os,
4600
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4601
        "disks": disks,
4602
        "hypervisor": instance.hypervisor,
4603
        "network_port": instance.network_port,
4604
        "hv_instance": instance.hvparams,
4605
        "hv_actual": cluster.FillHV(instance),
4606
        "be_instance": instance.beparams,
4607
        "be_actual": cluster.FillBE(instance),
4608
        }
4609

    
4610
      result[instance.name] = idict
4611

    
4612
    return result
4613

    
4614

    
4615
class LUSetInstanceParams(LogicalUnit):
4616
  """Modifies an instances's parameters.
4617

4618
  """
4619
  HPATH = "instance-modify"
4620
  HTYPE = constants.HTYPE_INSTANCE
4621
  _OP_REQP = ["instance_name", "hvparams"]
4622
  REQ_BGL = False
4623

    
4624
  def ExpandNames(self):
4625
    self._ExpandAndLockInstance()
4626
    self.needed_locks[locking.LEVEL_NODE] = []
4627
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4628

    
4629

    
4630
  def DeclareLocks(self, level):
4631
    if level == locking.LEVEL_NODE:
4632
      self._LockInstancesNodes()
4633

    
4634
  def BuildHooksEnv(self):
4635
    """Build hooks env.
4636

4637
    This runs on the master, primary and secondaries.
4638

4639
    """
4640
    args = dict()
4641
    if constants.BE_MEMORY in self.be_new:
4642
      args['memory'] = self.be_new[constants.BE_MEMORY]
4643
    if constants.BE_VCPUS in self.be_new:
4644
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
4645
    if self.do_ip or self.do_bridge or self.mac:
4646
      if self.do_ip:
4647
        ip = self.ip
4648
      else:
4649
        ip = self.instance.nics[0].ip
4650
      if self.bridge:
4651
        bridge = self.bridge
4652
      else:
4653
        bridge = self.instance.nics[0].bridge
4654
      if self.mac:
4655
        mac = self.mac
4656
      else:
4657
        mac = self.instance.nics[0].mac
4658
      args['nics'] = [(ip, bridge, mac)]
4659
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4660
    nl = [self.cfg.GetMasterNode(),
4661
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4662
    return env, nl, nl
4663

    
4664
  def CheckPrereq(self):
4665
    """Check prerequisites.
4666

4667
    This only checks the instance list against the existing names.
4668

4669
    """
4670
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4671
    # a separate CheckArguments function, if we implement one, so the operation
4672
    # can be aborted without waiting for any lock, should it have an error...
4673
    self.ip = getattr(self.op, "ip", None)
4674
    self.mac = getattr(self.op, "mac", None)
4675
    self.bridge = getattr(self.op, "bridge", None)
4676
    self.kernel_path = getattr(self.op, "kernel_path", None)
4677
    self.initrd_path = getattr(self.op, "initrd_path", None)
4678
    self.force = getattr(self.op, "force", None)
4679
    all_parms = [self.ip, self.bridge, self.mac]
4680
    if (all_parms.count(None) == len(all_parms) and
4681
        not self.op.hvparams and
4682
        not self.op.beparams):
4683
      raise errors.OpPrereqError("No changes submitted")
4684
    for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4685
      val = self.op.beparams.get(item, None)
4686
      if val is not None:
4687
        try:
4688
          val = int(val)
4689
        except ValueError, err:
4690
          raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4691
        self.op.beparams[item] = val
4692
    if self.ip is not None:
4693
      self.do_ip = True
4694
      if self.ip.lower() == "none":
4695
        self.ip = None
4696
      else:
4697
        if not utils.IsValidIP(self.ip):
4698
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4699
    else:
4700
      self.do_ip = False
4701
    self.do_bridge = (self.bridge is not None)
4702
    if self.mac is not None:
4703
      if self.cfg.IsMacInUse(self.mac):
4704
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4705
                                   self.mac)
4706
      if not utils.IsValidMac(self.mac):
4707
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4708

    
4709
    # checking the new params on the primary/secondary nodes
4710

    
4711
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4712
    assert self.instance is not None, \
4713
      "Cannot retrieve locked instance %s" % self.op.instance_name
4714
    pnode = self.instance.primary_node
4715
    nodelist = [pnode]
4716
    nodelist.extend(instance.secondary_nodes)
4717

    
4718
    # hvparams processing
4719
    if self.op.hvparams:
4720
      i_hvdict = copy.deepcopy(instance.hvparams)
4721
      for key, val in self.op.hvparams.iteritems():
4722
        if val is None:
4723
          try:
4724
            del i_hvdict[key]
4725
          except KeyError:
4726
            pass
4727
        else:
4728
          i_hvdict[key] = val
4729
      cluster = self.cfg.GetClusterInfo()
4730
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4731
                                i_hvdict)
4732
      # local check
4733
      hypervisor.GetHypervisor(
4734
        instance.hypervisor).CheckParameterSyntax(hv_new)
4735
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4736
      self.hv_new = hv_new # the new actual values
4737
      self.hv_inst = i_hvdict # the new dict (without defaults)
4738
    else:
4739
      self.hv_new = self.hv_inst = {}
4740

    
4741
    # beparams processing
4742
    if self.op.beparams:
4743
      i_bedict = copy.deepcopy(instance.beparams)
4744
      for key, val in self.op.beparams.iteritems():
4745
        if val is None:
4746
          try:
4747
            del i_bedict[key]
4748
          except KeyError:
4749
            pass
4750
        else:
4751
          i_bedict[key] = val
4752
      cluster = self.cfg.GetClusterInfo()
4753
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4754
                                i_bedict)
4755
      self.be_new = be_new # the new actual values
4756
      self.be_inst = i_bedict # the new dict (without defaults)
4757
    else:
4758
      self.hv_new = self.hv_inst = {}
4759

    
4760
    self.warn = []
4761

    
4762
    if constants.BE_MEMORY in self.op.beparams and not self.force:
4763
      mem_check_list = [pnode]
4764
      if be_new[constants.BE_AUTO_BALANCE]:
4765
        # either we changed auto_balance to yes or it was from before
4766
        mem_check_list.extend(instance.secondary_nodes)
4767
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4768
                                                  instance.hypervisor)
4769
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4770
                                         instance.hypervisor)
4771

    
4772
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4773
        # Assume the primary node is unreachable and go ahead
4774
        self.warn.append("Can't get info from primary node %s" % pnode)
4775
      else:
4776
        if instance_info:
4777
          current_mem = instance_info['memory']
4778
        else:
4779
          # Assume instance not running
4780
          # (there is a slight race condition here, but it's not very probable,
4781
          # and we have no other way to check)
4782
          current_mem = 0
4783
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4784
                    nodeinfo[pnode]['memory_free'])
4785
        if miss_mem > 0:
4786
          raise errors.OpPrereqError("This change will prevent the instance"
4787
                                     " from starting, due to %d MB of memory"
4788
                                     " missing on its primary node" % miss_mem)
4789

    
4790
      if be_new[constants.BE_AUTO_BALANCE]:
4791
        for node in instance.secondary_nodes:
4792
          if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4793
            self.warn.append("Can't get info from secondary node %s" % node)
4794
          elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4795
            self.warn.append("Not enough memory to failover instance to"
4796
                             " secondary node %s" % node)
4797

    
4798
    return
4799

    
4800
  def Exec(self, feedback_fn):
4801
    """Modifies an instance.
4802

4803
    All parameters take effect only at the next restart of the instance.
4804
    """
4805
    # Process here the warnings from CheckPrereq, as we don't have a
4806
    # feedback_fn there.
4807
    for warn in self.warn:
4808
      feedback_fn("WARNING: %s" % warn)
4809

    
4810
    result = []
4811
    instance = self.instance
4812
    if self.do_ip:
4813
      instance.nics[0].ip = self.ip
4814
      result.append(("ip", self.ip))
4815
    if self.bridge:
4816
      instance.nics[0].bridge = self.bridge
4817
      result.append(("bridge", self.bridge))
4818
    if self.mac:
4819
      instance.nics[0].mac = self.mac
4820
      result.append(("mac", self.mac))
4821
    if self.op.hvparams:
4822
      instance.hvparams = self.hv_new
4823
      for key, val in self.op.hvparams.iteritems():
4824
        result.append(("hv/%s" % key, val))
4825
    if self.op.beparams:
4826
      instance.beparams = self.be_inst
4827
      for key, val in self.op.beparams.iteritems():
4828
        result.append(("be/%s" % key, val))
4829

    
4830
    self.cfg.Update(instance)
4831

    
4832
    return result
4833

    
4834

    
4835
class LUQueryExports(NoHooksLU):
4836
  """Query the exports list
4837

4838
  """
4839
  _OP_REQP = ['nodes']
4840
  REQ_BGL = False
4841

    
4842
  def ExpandNames(self):
4843
    self.needed_locks = {}
4844
    self.share_locks[locking.LEVEL_NODE] = 1
4845
    if not self.op.nodes:
4846
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4847
    else:
4848
      self.needed_locks[locking.LEVEL_NODE] = \
4849
        _GetWantedNodes(self, self.op.nodes)
4850

    
4851
  def CheckPrereq(self):
4852
    """Check prerequisites.
4853

4854
    """
4855
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4856

    
4857
  def Exec(self, feedback_fn):
4858
    """Compute the list of all the exported system images.
4859

4860
    Returns:
4861
      a dictionary with the structure node->(export-list)
4862
      where export-list is a list of the instances exported on
4863
      that node.
4864

4865
    """
4866
    return self.rpc.call_export_list(self.nodes)
4867

    
4868

    
4869
class LUExportInstance(LogicalUnit):
4870
  """Export an instance to an image in the cluster.
4871

4872
  """
4873
  HPATH = "instance-export"
4874
  HTYPE = constants.HTYPE_INSTANCE
4875
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4876
  REQ_BGL = False
4877

    
4878
  def ExpandNames(self):
4879
    self._ExpandAndLockInstance()
4880
    # FIXME: lock only instance primary and destination node
4881
    #
4882
    # Sad but true, for now we have do lock all nodes, as we don't know where
4883
    # the previous export might be, and and in this LU we search for it and
4884
    # remove it from its current node. In the future we could fix this by:
4885
    #  - making a tasklet to search (share-lock all), then create the new one,
4886
    #    then one to remove, after
4887
    #  - removing the removal operation altoghether
4888
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4889

    
4890
  def DeclareLocks(self, level):
4891
    """Last minute lock declaration."""
4892
    # All nodes are locked anyway, so nothing to do here.
4893

    
4894
  def BuildHooksEnv(self):
4895
    """Build hooks env.
4896

4897
    This will run on the master, primary node and target node.
4898

4899
    """
4900
    env = {
4901
      "EXPORT_NODE": self.op.target_node,
4902
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4903
      }
4904
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4905
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4906
          self.op.target_node]
4907
    return env, nl, nl
4908

    
4909
  def CheckPrereq(self):
4910
    """Check prerequisites.
4911

4912
    This checks that the instance and node names are valid.
4913

4914
    """
4915
    instance_name = self.op.instance_name
4916
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4917
    assert self.instance is not None, \
4918
          "Cannot retrieve locked instance %s" % self.op.instance_name
4919

    
4920
    self.dst_node = self.cfg.GetNodeInfo(
4921
      self.cfg.ExpandNodeName(self.op.target_node))
4922

    
4923
    assert self.dst_node is not None, \
4924
          "Cannot retrieve locked node %s" % self.op.target_node
4925

    
4926
    # instance disk type verification
4927
    for disk in self.instance.disks:
4928
      if disk.dev_type == constants.LD_FILE:
4929
        raise errors.OpPrereqError("Export not supported for instances with"
4930
                                   " file-based disks")
4931

    
4932
  def Exec(self, feedback_fn):
4933
    """Export an instance to an image in the cluster.
4934

4935
    """
4936
    instance = self.instance
4937
    dst_node = self.dst_node
4938
    src_node = instance.primary_node
4939
    if self.op.shutdown:
4940
      # shutdown the instance, but not the disks
4941
      if not self.rpc.call_instance_shutdown(src_node, instance):
4942
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4943
                                 (instance.name, src_node))
4944

    
4945
    vgname = self.cfg.GetVGName()
4946

    
4947
    snap_disks = []
4948

    
4949
    try:
4950
      for disk in instance.disks:
4951
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4952
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4953

    
4954
        if not new_dev_name:
4955
          self.LogWarning("Could not snapshot block device %s on node %s",
4956
                          disk.logical_id[1], src_node)
4957
          snap_disks.append(False)
4958
        else:
4959
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4960
                                 logical_id=(vgname, new_dev_name),
4961
                                 physical_id=(vgname, new_dev_name),
4962
                                 iv_name=disk.iv_name)
4963
          snap_disks.append(new_dev)
4964

    
4965
    finally:
4966
      if self.op.shutdown and instance.status == "up":
4967
        if not self.rpc.call_instance_start(src_node, instance, None):
4968
          _ShutdownInstanceDisks(self, instance)
4969
          raise errors.OpExecError("Could not start instance")
4970

    
4971
    # TODO: check for size
4972

    
4973
    cluster_name = self.cfg.GetClusterName()
4974
    for idx, dev in enumerate(snap_disks):
4975
      if dev:
4976
        if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4977
                                             instance, cluster_name, idx):
4978
          self.LogWarning("Could not export block device %s from node %s to"
4979
                          " node %s", dev.logical_id[1], src_node,
4980
                          dst_node.name)
4981
        if not self.rpc.call_blockdev_remove(src_node, dev):
4982
          self.LogWarning("Could not remove snapshot block device %s from node"
4983
                          " %s", dev.logical_id[1], src_node)
4984

    
4985
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4986
      self.LogWarning("Could not finalize export for instance %s on node %s",
4987
                      instance.name, dst_node.name)
4988

    
4989
    nodelist = self.cfg.GetNodeList()
4990
    nodelist.remove(dst_node.name)
4991

    
4992
    # on one-node clusters nodelist will be empty after the removal
4993
    # if we proceed the backup would be removed because OpQueryExports
4994
    # substitutes an empty list with the full cluster node list.
4995
    if nodelist:
4996
      exportlist = self.rpc.call_export_list(nodelist)
4997
      for node in exportlist:
4998
        if instance.name in exportlist[node]:
4999
          if not self.rpc.call_export_remove(node, instance.name):
5000
            self.LogWarning("Could not remove older export for instance %s"
5001
                            " on node %s", instance.name, node)
5002

    
5003

    
5004
class LURemoveExport(NoHooksLU):
5005
  """Remove exports related to the named instance.
5006

5007
  """
5008
  _OP_REQP = ["instance_name"]
5009
  REQ_BGL = False
5010

    
5011
  def ExpandNames(self):
5012
    self.needed_locks = {}
5013
    # We need all nodes to be locked in order for RemoveExport to work, but we
5014
    # don't need to lock the instance itself, as nothing will happen to it (and
5015
    # we can remove exports also for a removed instance)
5016
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5017

    
5018
  def CheckPrereq(self):
5019
    """Check prerequisites.
5020
    """
5021
    pass
5022

    
5023
  def Exec(self, feedback_fn):
5024
    """Remove any export.
5025

5026
    """
5027
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5028
    # If the instance was not found we'll try with the name that was passed in.
5029
    # This will only work if it was an FQDN, though.
5030
    fqdn_warn = False
5031
    if not instance_name:
5032
      fqdn_warn = True
5033
      instance_name = self.op.instance_name
5034

    
5035
    exportlist = self.rpc.call_export_list(self.acquired_locks[
5036
      locking.LEVEL_NODE])
5037
    found = False
5038
    for node in exportlist:
5039
      if instance_name in exportlist[node]:
5040
        found = True
5041
        if not self.rpc.call_export_remove(node, instance_name):
5042
          logging.error("Could not remove export for instance %s"
5043
                        " on node %s", instance_name, node)
5044

    
5045
    if fqdn_warn and not found:
5046
      feedback_fn("Export not found. If trying to remove an export belonging"
5047
                  " to a deleted instance please use its Fully Qualified"
5048
                  " Domain Name.")
5049

    
5050

    
5051
class TagsLU(NoHooksLU):
5052
  """Generic tags LU.
5053

5054
  This is an abstract class which is the parent of all the other tags LUs.
5055

5056
  """
5057

    
5058
  def ExpandNames(self):
5059
    self.needed_locks = {}
5060
    if self.op.kind == constants.TAG_NODE:
5061
      name = self.cfg.ExpandNodeName(self.op.name)
5062
      if name is None:
5063
        raise errors.OpPrereqError("Invalid node name (%s)" %
5064
                                   (self.op.name,))
5065
      self.op.name = name
5066
      self.needed_locks[locking.LEVEL_NODE] = name
5067
    elif self.op.kind == constants.TAG_INSTANCE:
5068
      name = self.cfg.ExpandInstanceName(self.op.name)
5069
      if name is None:
5070
        raise errors.OpPrereqError("Invalid instance name (%s)" %
5071
                                   (self.op.name,))
5072
      self.op.name = name
5073
      self.needed_locks[locking.LEVEL_INSTANCE] = name
5074

    
5075
  def CheckPrereq(self):
5076
    """Check prerequisites.
5077

5078
    """
5079
    if self.op.kind == constants.TAG_CLUSTER:
5080
      self.target = self.cfg.GetClusterInfo()
5081
    elif self.op.kind == constants.TAG_NODE:
5082
      self.target = self.cfg.GetNodeInfo(self.op.name)
5083
    elif self.op.kind == constants.TAG_INSTANCE:
5084
      self.target = self.cfg.GetInstanceInfo(self.op.name)
5085
    else:
5086
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5087
                                 str(self.op.kind))
5088

    
5089

    
5090
class LUGetTags(TagsLU):
5091
  """Returns the tags of a given object.
5092

5093
  """
5094
  _OP_REQP = ["kind", "name"]
5095
  REQ_BGL = False
5096

    
5097
  def Exec(self, feedback_fn):
5098
    """Returns the tag list.
5099

5100
    """
5101
    return list(self.target.GetTags())
5102

    
5103

    
5104
class LUSearchTags(NoHooksLU):
5105
  """Searches the tags for a given pattern.
5106

5107
  """
5108
  _OP_REQP = ["pattern"]
5109
  REQ_BGL = False
5110

    
5111
  def ExpandNames(self):
5112
    self.needed_locks = {}
5113

    
5114
  def CheckPrereq(self):
5115
    """Check prerequisites.
5116

5117
    This checks the pattern passed for validity by compiling it.
5118

5119
    """
5120
    try:
5121
      self.re = re.compile(self.op.pattern)
5122
    except re.error, err:
5123
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5124
                                 (self.op.pattern, err))
5125

    
5126
  def Exec(self, feedback_fn):
5127
    """Returns the tag list.
5128

5129
    """
5130
    cfg = self.cfg
5131
    tgts = [("/cluster", cfg.GetClusterInfo())]
5132
    ilist = cfg.GetAllInstancesInfo().values()
5133
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5134
    nlist = cfg.GetAllNodesInfo().values()
5135
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5136
    results = []
5137
    for path, target in tgts:
5138
      for tag in target.GetTags():
5139
        if self.re.search(tag):
5140
          results.append((path, tag))
5141
    return results
5142

    
5143

    
5144
class LUAddTags(TagsLU):
5145
  """Sets a tag on a given object.
5146

5147
  """
5148
  _OP_REQP = ["kind", "name", "tags"]
5149
  REQ_BGL = False
5150

    
5151
  def CheckPrereq(self):
5152
    """Check prerequisites.
5153

5154
    This checks the type and length of the tag name and value.
5155

5156
    """
5157
    TagsLU.CheckPrereq(self)
5158
    for tag in self.op.tags:
5159
      objects.TaggableObject.ValidateTag(tag)
5160

    
5161
  def Exec(self, feedback_fn):
5162
    """Sets the tag.
5163

5164
    """
5165
    try:
5166
      for tag in self.op.tags:
5167
        self.target.AddTag(tag)
5168
    except errors.TagError, err:
5169
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5170
    try:
5171
      self.cfg.Update(self.target)
5172
    except errors.ConfigurationError:
5173
      raise errors.OpRetryError("There has been a modification to the"
5174
                                " config file and the operation has been"
5175
                                " aborted. Please retry.")
5176

    
5177

    
5178
class LUDelTags(TagsLU):
5179
  """Delete a list of tags from a given object.
5180

5181
  """
5182
  _OP_REQP = ["kind", "name", "tags"]
5183
  REQ_BGL = False
5184

    
5185
  def CheckPrereq(self):
5186
    """Check prerequisites.
5187

5188
    This checks that we have the given tag.
5189

5190
    """
5191
    TagsLU.CheckPrereq(self)
5192
    for tag in self.op.tags:
5193
      objects.TaggableObject.ValidateTag(tag)
5194
    del_tags = frozenset(self.op.tags)
5195
    cur_tags = self.target.GetTags()
5196
    if not del_tags <= cur_tags:
5197
      diff_tags = del_tags - cur_tags
5198
      diff_names = ["'%s'" % tag for tag in diff_tags]
5199
      diff_names.sort()
5200
      raise errors.OpPrereqError("Tag(s) %s not found" %
5201
                                 (",".join(diff_names)))
5202

    
5203
  def Exec(self, feedback_fn):
5204
    """Remove the tag from the object.
5205

5206
    """
5207
    for tag in self.op.tags:
5208
      self.target.RemoveTag(tag)
5209
    try:
5210
      self.cfg.Update(self.target)
5211
    except errors.ConfigurationError:
5212
      raise errors.OpRetryError("There has been a modification to the"
5213
                                " config file and the operation has been"
5214
                                " aborted. Please retry.")
5215

    
5216

    
5217
class LUTestDelay(NoHooksLU):
5218
  """Sleep for a specified amount of time.
5219

5220
  This LU sleeps on the master and/or nodes for a specified amount of
5221
  time.
5222

5223
  """
5224
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5225
  REQ_BGL = False
5226

    
5227
  def ExpandNames(self):
5228
    """Expand names and set required locks.
5229

5230
    This expands the node list, if any.
5231

5232
    """
5233
    self.needed_locks = {}
5234
    if self.op.on_nodes:
5235
      # _GetWantedNodes can be used here, but is not always appropriate to use
5236
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5237
      # more information.
5238
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5239
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5240

    
5241
  def CheckPrereq(self):
5242
    """Check prerequisites.
5243

5244
    """
5245

    
5246
  def Exec(self, feedback_fn):
5247
    """Do the actual sleep.
5248

5249
    """
5250
    if self.op.on_master:
5251
      if not utils.TestDelay(self.op.duration):
5252
        raise errors.OpExecError("Error during master delay test")
5253
    if self.op.on_nodes:
5254
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5255
      if not result:
5256
        raise errors.OpExecError("Complete failure from rpc call")
5257
      for node, node_result in result.items():
5258
        if not node_result:
5259
          raise errors.OpExecError("Failure during rpc call to node %s,"
5260
                                   " result: %s" % (node, node_result))
5261

    
5262

    
5263
class IAllocator(object):
5264
  """IAllocator framework.
5265

5266
  An IAllocator instance has three sets of attributes:
5267
    - cfg that is needed to query the cluster
5268
    - input data (all members of the _KEYS class attribute are required)
5269
    - four buffer attributes (in|out_data|text), that represent the
5270
      input (to the external script) in text and data structure format,
5271
      and the output from it, again in two formats
5272
    - the result variables from the script (success, info, nodes) for
5273
      easy usage
5274

5275
  """
5276
  _ALLO_KEYS = [
5277
    "mem_size", "disks", "disk_template",
5278
    "os", "tags", "nics", "vcpus",
5279
    ]
5280
  _RELO_KEYS = [
5281
    "relocate_from",
5282
    ]
5283

    
5284
  def __init__(self, lu, mode, name, **kwargs):
5285
    self.lu = lu
5286
    # init buffer variables
5287
    self.in_text = self.out_text = self.in_data = self.out_data = None
5288
    # init all input fields so that pylint is happy
5289
    self.mode = mode
5290
    self.name = name
5291
    self.mem_size = self.disks = self.disk_template = None
5292
    self.os = self.tags = self.nics = self.vcpus = None
5293
    self.relocate_from = None
5294
    # computed fields
5295
    self.required_nodes = None
5296
    # init result fields
5297
    self.success = self.info = self.nodes = None
5298
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5299
      keyset = self._ALLO_KEYS
5300
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5301
      keyset = self._RELO_KEYS
5302
    else:
5303
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5304
                                   " IAllocator" % self.mode)
5305
    for key in kwargs:
5306
      if key not in keyset:
5307
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5308
                                     " IAllocator" % key)
5309
      setattr(self, key, kwargs[key])
5310
    for key in keyset:
5311
      if key not in kwargs:
5312
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5313
                                     " IAllocator" % key)
5314
    self._BuildInputData()
5315

    
5316
  def _ComputeClusterData(self):
5317
    """Compute the generic allocator input data.
5318

5319
    This is the data that is independent of the actual operation.
5320

5321
    """
5322
    cfg = self.lu.cfg
5323
    cluster_info = cfg.GetClusterInfo()
5324
    # cluster data
5325
    data = {
5326
      "version": 1,
5327
      "cluster_name": cfg.GetClusterName(),
5328
      "cluster_tags": list(cluster_info.GetTags()),
5329
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5330
      # we don't have job IDs
5331
      }
5332

    
5333
    i_list = []
5334
    cluster = self.cfg.GetClusterInfo()
5335
    for iname in cfg.GetInstanceList():
5336
      i_obj = cfg.GetInstanceInfo(iname)
5337
      i_list.append((i_obj, cluster.FillBE(i_obj)))
5338

    
5339
    # node data
5340
    node_results = {}
5341
    node_list = cfg.GetNodeList()
5342
    # FIXME: here we have only one hypervisor information, but
5343
    # instance can belong to different hypervisors
5344
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5345
                                           cfg.GetHypervisorType())
5346
    for nname in node_list:
5347
      ninfo = cfg.GetNodeInfo(nname)
5348
      if nname not in node_data or not isinstance(node_data[nname], dict):
5349
        raise errors.OpExecError("Can't get data for node %s" % nname)
5350
      remote_info = node_data[nname]
5351
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5352
                   'vg_size', 'vg_free', 'cpu_total']:
5353
        if attr not in remote_info:
5354
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5355
                                   (nname, attr))
5356
        try:
5357
          remote_info[attr] = int(remote_info[attr])
5358
        except ValueError, err:
5359
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5360
                                   " %s" % (nname, attr, str(err)))
5361
      # compute memory used by primary instances
5362
      i_p_mem = i_p_up_mem = 0
5363
      for iinfo, beinfo in i_list:
5364
        if iinfo.primary_node == nname:
5365
          i_p_mem += beinfo[constants.BE_MEMORY]
5366
          if iinfo.status == "up":
5367
            i_p_up_mem += beinfo[constants.BE_MEMORY]
5368

    
5369
      # compute memory used by instances
5370
      pnr = {
5371
        "tags": list(ninfo.GetTags()),
5372
        "total_memory": remote_info['memory_total'],
5373
        "reserved_memory": remote_info['memory_dom0'],
5374
        "free_memory": remote_info['memory_free'],
5375
        "i_pri_memory": i_p_mem,
5376
        "i_pri_up_memory": i_p_up_mem,
5377
        "total_disk": remote_info['vg_size'],
5378
        "free_disk": remote_info['vg_free'],
5379
        "primary_ip": ninfo.primary_ip,
5380
        "secondary_ip": ninfo.secondary_ip,
5381
        "total_cpus": remote_info['cpu_total'],
5382
        }
5383
      node_results[nname] = pnr
5384
    data["nodes"] = node_results
5385

    
5386
    # instance data
5387
    instance_data = {}
5388
    for iinfo, beinfo in i_list:
5389
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5390
                  for n in iinfo.nics]
5391
      pir = {
5392
        "tags": list(iinfo.GetTags()),
5393
        "should_run": iinfo.status == "up",
5394
        "vcpus": beinfo[constants.BE_VCPUS],
5395
        "memory": beinfo[constants.BE_MEMORY],
5396
        "os": iinfo.os,
5397
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5398
        "nics": nic_data,
5399
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5400
        "disk_template": iinfo.disk_template,
5401
        "hypervisor": iinfo.hypervisor,
5402
        }
5403
      instance_data[iinfo.name] = pir
5404

    
5405
    data["instances"] = instance_data
5406

    
5407
    self.in_data = data
5408

    
5409
  def _AddNewInstance(self):
5410
    """Add new instance data to allocator structure.
5411

5412
    This in combination with _AllocatorGetClusterData will create the
5413
    correct structure needed as input for the allocator.
5414

5415
    The checks for the completeness of the opcode must have already been
5416
    done.
5417

5418
    """
5419
    data = self.in_data
5420
    if len(self.disks) != 2:
5421
      raise errors.OpExecError("Only two-disk configurations supported")
5422

    
5423
    disk_space = _ComputeDiskSize(self.disk_template,
5424
                                  self.disks[0]["size"], self.disks[1]["size"])
5425

    
5426
    if self.disk_template in constants.DTS_NET_MIRROR:
5427
      self.required_nodes = 2
5428
    else:
5429
      self.required_nodes = 1
5430
    request = {
5431
      "type": "allocate",
5432
      "name": self.name,
5433
      "disk_template": self.disk_template,
5434
      "tags": self.tags,
5435
      "os": self.os,
5436
      "vcpus": self.vcpus,
5437
      "memory": self.mem_size,
5438
      "disks": self.disks,
5439
      "disk_space_total": disk_space,
5440
      "nics": self.nics,
5441
      "required_nodes": self.required_nodes,
5442
      }
5443
    data["request"] = request
5444

    
5445
  def _AddRelocateInstance(self):
5446
    """Add relocate instance data to allocator structure.
5447

5448
    This in combination with _IAllocatorGetClusterData will create the
5449
    correct structure needed as input for the allocator.
5450

5451
    The checks for the completeness of the opcode must have already been
5452
    done.
5453

5454
    """
5455
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5456
    if instance is None:
5457
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5458
                                   " IAllocator" % self.name)
5459

    
5460
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5461
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5462

    
5463
    if len(instance.secondary_nodes) != 1:
5464
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5465

    
5466
    self.required_nodes = 1
5467

    
5468
    disk_space = _ComputeDiskSize(instance.disk_template,
5469
                                  instance.disks[0].size,
5470
                                  instance.disks[1].size)
5471

    
5472
    request = {
5473
      "type": "relocate",
5474
      "name": self.name,
5475
      "disk_space_total": disk_space,
5476
      "required_nodes": self.required_nodes,
5477
      "relocate_from": self.relocate_from,
5478
      }
5479
    self.in_data["request"] = request
5480

    
5481
  def _BuildInputData(self):
5482
    """Build input data structures.
5483

5484
    """
5485
    self._ComputeClusterData()
5486

    
5487
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5488
      self._AddNewInstance()
5489
    else:
5490
      self._AddRelocateInstance()
5491

    
5492
    self.in_text = serializer.Dump(self.in_data)
5493

    
5494
  def Run(self, name, validate=True, call_fn=None):
5495
    """Run an instance allocator and return the results.
5496

5497
    """
5498
    if call_fn is None:
5499
      call_fn = self.lu.rpc.call_iallocator_runner
5500
    data = self.in_text
5501

    
5502
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5503

    
5504
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5505
      raise errors.OpExecError("Invalid result from master iallocator runner")
5506

    
5507
    rcode, stdout, stderr, fail = result
5508

    
5509
    if rcode == constants.IARUN_NOTFOUND:
5510
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5511
    elif rcode == constants.IARUN_FAILURE:
5512
      raise errors.OpExecError("Instance allocator call failed: %s,"
5513
                               " output: %s" % (fail, stdout+stderr))
5514
    self.out_text = stdout
5515
    if validate:
5516
      self._ValidateResult()
5517

    
5518
  def _ValidateResult(self):
5519
    """Process the allocator results.
5520

5521
    This will process and if successful save the result in
5522
    self.out_data and the other parameters.
5523

5524
    """
5525
    try:
5526
      rdict = serializer.Load(self.out_text)
5527
    except Exception, err:
5528
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5529

    
5530
    if not isinstance(rdict, dict):
5531
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5532

    
5533
    for key in "success", "info", "nodes":
5534
      if key not in rdict:
5535
        raise errors.OpExecError("Can't parse iallocator results:"
5536
                                 " missing key '%s'" % key)
5537
      setattr(self, key, rdict[key])
5538

    
5539
    if not isinstance(rdict["nodes"], list):
5540
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5541
                               " is not a list")
5542
    self.out_data = rdict
5543

    
5544

    
5545
class LUTestAllocator(NoHooksLU):
5546
  """Run allocator tests.
5547

5548
  This LU runs the allocator tests
5549

5550
  """
5551
  _OP_REQP = ["direction", "mode", "name"]
5552

    
5553
  def CheckPrereq(self):
5554
    """Check prerequisites.
5555

5556
    This checks the opcode parameters depending on the director and mode test.
5557

5558
    """
5559
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5560
      for attr in ["name", "mem_size", "disks", "disk_template",
5561
                   "os", "tags", "nics", "vcpus"]:
5562
        if not hasattr(self.op, attr):
5563
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5564
                                     attr)
5565
      iname = self.cfg.ExpandInstanceName(self.op.name)
5566
      if iname is not None:
5567
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5568
                                   iname)
5569
      if not isinstance(self.op.nics, list):
5570
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5571
      for row in self.op.nics:
5572
        if (not isinstance(row, dict) or
5573
            "mac" not in row or
5574
            "ip" not in row or
5575
            "bridge" not in row):
5576
          raise errors.OpPrereqError("Invalid contents of the"
5577
                                     " 'nics' parameter")
5578
      if not isinstance(self.op.disks, list):
5579
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5580
      if len(self.op.disks) != 2:
5581
        raise errors.OpPrereqError("Only two-disk configurations supported")
5582
      for row in self.op.disks:
5583
        if (not isinstance(row, dict) or
5584
            "size" not in row or
5585
            not isinstance(row["size"], int) or
5586
            "mode" not in row or
5587
            row["mode"] not in ['r', 'w']):
5588
          raise errors.OpPrereqError("Invalid contents of the"
5589
                                     " 'disks' parameter")
5590
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5591
      if not hasattr(self.op, "name"):
5592
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5593
      fname = self.cfg.ExpandInstanceName(self.op.name)
5594
      if fname is None:
5595
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5596
                                   self.op.name)
5597
      self.op.name = fname
5598
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5599
    else:
5600
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5601
                                 self.op.mode)
5602

    
5603
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5604
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5605
        raise errors.OpPrereqError("Missing allocator name")
5606
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5607
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5608
                                 self.op.direction)
5609

    
5610
  def Exec(self, feedback_fn):
5611
    """Run the allocator test.
5612

5613
    """
5614
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5615
      ial = IAllocator(self,
5616
                       mode=self.op.mode,
5617
                       name=self.op.name,
5618
                       mem_size=self.op.mem_size,
5619
                       disks=self.op.disks,
5620
                       disk_template=self.op.disk_template,
5621
                       os=self.op.os,
5622
                       tags=self.op.tags,
5623
                       nics=self.op.nics,
5624
                       vcpus=self.op.vcpus,
5625
                       )
5626
    else:
5627
      ial = IAllocator(self,
5628
                       mode=self.op.mode,
5629
                       name=self.op.name,
5630
                       relocate_from=list(self.relocate_from),
5631
                       )
5632

    
5633
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5634
      result = ial.in_text
5635
    else:
5636
      ial.Run(self.op.allocator, validate=False)
5637
      result = ial.out_text
5638
    return result