Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3977a4c1

History | View | Annotate | Download (170.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import config
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import ssconf
46
from ganeti import serializer
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_MASTER: the LU needs to run on the master node
60
        REQ_WSSTORE: the LU needs a writable SimpleStore
61
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62

63
  Note that all commands require root permissions.
64

65
  """
66
  HPATH = None
67
  HTYPE = None
68
  _OP_REQP = []
69
  REQ_MASTER = True
70
  REQ_WSSTORE = False
71
  REQ_BGL = True
72

    
73
  def __init__(self, processor, op, context, sstore):
74
    """Constructor for LogicalUnit.
75

76
    This needs to be overriden in derived classes in order to check op
77
    validity.
78

79
    """
80
    self.proc = processor
81
    self.op = op
82
    self.cfg = context.cfg
83
    self.sstore = sstore
84
    self.context = context
85
    self.needed_locks = None
86
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87
    self.__ssh = None
88

    
89
    for attr_name in self._OP_REQP:
90
      attr_val = getattr(op, attr_name, None)
91
      if attr_val is None:
92
        raise errors.OpPrereqError("Required parameter '%s' missing" %
93
                                   attr_name)
94

    
95
    if not self.cfg.IsCluster():
96
      raise errors.OpPrereqError("Cluster not initialized yet,"
97
                                 " use 'gnt-cluster init' first.")
98
    if self.REQ_MASTER:
99
      master = sstore.GetMasterNode()
100
      if master != utils.HostInfo().name:
101
        raise errors.OpPrereqError("Commands must be run on the master"
102
                                   " node %s" % master)
103

    
104
  def __GetSSH(self):
105
    """Returns the SshRunner object
106

107
    """
108
    if not self.__ssh:
109
      self.__ssh = ssh.SshRunner(self.sstore)
110
    return self.__ssh
111

    
112
  ssh = property(fget=__GetSSH)
113

    
114
  def ExpandNames(self):
115
    """Expand names for this LU.
116

117
    This method is called before starting to execute the opcode, and it should
118
    update all the parameters of the opcode to their canonical form (e.g. a
119
    short node name must be fully expanded after this method has successfully
120
    completed). This way locking, hooks, logging, ecc. can work correctly.
121

122
    LUs which implement this method must also populate the self.needed_locks
123
    member, as a dict with lock levels as keys, and a list of needed lock names
124
    as values. Rules:
125
      - Use an empty dict if you don't need any lock
126
      - If you don't need any lock at a particular level omit that level
127
      - Don't put anything for the BGL level
128
      - If you want all locks at a level use None as a value
129
        (this reflects what LockSet does, and will be replaced before
130
        CheckPrereq with the full list of nodes that have been locked)
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: None,
140
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265

    
266
class NoHooksLU(LogicalUnit):
267
  """Simple LU which runs no hooks.
268

269
  This LU is intended as a parent for other LogicalUnits which will
270
  run no hooks, in order to reduce duplicate code.
271

272
  """
273
  HPATH = None
274
  HTYPE = None
275

    
276

    
277
def _GetWantedNodes(lu, nodes):
278
  """Returns list of checked and expanded node names.
279

280
  Args:
281
    nodes: List of nodes (strings) or None for all
282

283
  """
284
  if not isinstance(nodes, list):
285
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
286

    
287
  if nodes:
288
    wanted = []
289

    
290
    for name in nodes:
291
      node = lu.cfg.ExpandNodeName(name)
292
      if node is None:
293
        raise errors.OpPrereqError("No such node name '%s'" % name)
294
      wanted.append(node)
295

    
296
  else:
297
    wanted = lu.cfg.GetNodeList()
298
  return utils.NiceSort(wanted)
299

    
300

    
301
def _GetWantedInstances(lu, instances):
302
  """Returns list of checked and expanded instance names.
303

304
  Args:
305
    instances: List of instances (strings) or None for all
306

307
  """
308
  if not isinstance(instances, list):
309
    raise errors.OpPrereqError("Invalid argument type 'instances'")
310

    
311
  if instances:
312
    wanted = []
313

    
314
    for name in instances:
315
      instance = lu.cfg.ExpandInstanceName(name)
316
      if instance is None:
317
        raise errors.OpPrereqError("No such instance name '%s'" % name)
318
      wanted.append(instance)
319

    
320
  else:
321
    wanted = lu.cfg.GetInstanceList()
322
  return utils.NiceSort(wanted)
323

    
324

    
325
def _CheckOutputFields(static, dynamic, selected):
326
  """Checks whether all selected fields are valid.
327

328
  Args:
329
    static: Static fields
330
    dynamic: Dynamic fields
331

332
  """
333
  static_fields = frozenset(static)
334
  dynamic_fields = frozenset(dynamic)
335

    
336
  all_fields = static_fields | dynamic_fields
337

    
338
  if not all_fields.issuperset(selected):
339
    raise errors.OpPrereqError("Unknown output fields selected: %s"
340
                               % ",".join(frozenset(selected).
341
                                          difference(all_fields)))
342

    
343

    
344
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
345
                          memory, vcpus, nics):
346
  """Builds instance related env variables for hooks from single variables.
347

348
  Args:
349
    secondary_nodes: List of secondary nodes as strings
350
  """
351
  env = {
352
    "OP_TARGET": name,
353
    "INSTANCE_NAME": name,
354
    "INSTANCE_PRIMARY": primary_node,
355
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
356
    "INSTANCE_OS_TYPE": os_type,
357
    "INSTANCE_STATUS": status,
358
    "INSTANCE_MEMORY": memory,
359
    "INSTANCE_VCPUS": vcpus,
360
  }
361

    
362
  if nics:
363
    nic_count = len(nics)
364
    for idx, (ip, bridge, mac) in enumerate(nics):
365
      if ip is None:
366
        ip = ""
367
      env["INSTANCE_NIC%d_IP" % idx] = ip
368
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
369
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
370
  else:
371
    nic_count = 0
372

    
373
  env["INSTANCE_NIC_COUNT"] = nic_count
374

    
375
  return env
376

    
377

    
378
def _BuildInstanceHookEnvByObject(instance, override=None):
379
  """Builds instance related env variables for hooks from an object.
380

381
  Args:
382
    instance: objects.Instance object of instance
383
    override: dict of values to override
384
  """
385
  args = {
386
    'name': instance.name,
387
    'primary_node': instance.primary_node,
388
    'secondary_nodes': instance.secondary_nodes,
389
    'os_type': instance.os,
390
    'status': instance.os,
391
    'memory': instance.memory,
392
    'vcpus': instance.vcpus,
393
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
394
  }
395
  if override:
396
    args.update(override)
397
  return _BuildInstanceHookEnv(**args)
398

    
399

    
400
def _CheckInstanceBridgesExist(instance):
401
  """Check that the brigdes needed by an instance exist.
402

403
  """
404
  # check bridges existance
405
  brlist = [nic.bridge for nic in instance.nics]
406
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
407
    raise errors.OpPrereqError("one or more target bridges %s does not"
408
                               " exist on destination node '%s'" %
409
                               (brlist, instance.primary_node))
410

    
411

    
412
class LUDestroyCluster(NoHooksLU):
413
  """Logical unit for destroying the cluster.
414

415
  """
416
  _OP_REQP = []
417

    
418
  def CheckPrereq(self):
419
    """Check prerequisites.
420

421
    This checks whether the cluster is empty.
422

423
    Any errors are signalled by raising errors.OpPrereqError.
424

425
    """
426
    master = self.sstore.GetMasterNode()
427

    
428
    nodelist = self.cfg.GetNodeList()
429
    if len(nodelist) != 1 or nodelist[0] != master:
430
      raise errors.OpPrereqError("There are still %d node(s) in"
431
                                 " this cluster." % (len(nodelist) - 1))
432
    instancelist = self.cfg.GetInstanceList()
433
    if instancelist:
434
      raise errors.OpPrereqError("There are still %d instance(s) in"
435
                                 " this cluster." % len(instancelist))
436

    
437
  def Exec(self, feedback_fn):
438
    """Destroys the cluster.
439

440
    """
441
    master = self.sstore.GetMasterNode()
442
    if not rpc.call_node_stop_master(master, False):
443
      raise errors.OpExecError("Could not disable the master role")
444
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
445
    utils.CreateBackup(priv_key)
446
    utils.CreateBackup(pub_key)
447
    rpc.call_node_leave_cluster(master)
448

    
449

    
450
class LUVerifyCluster(LogicalUnit):
451
  """Verifies the cluster status.
452

453
  """
454
  HPATH = "cluster-verify"
455
  HTYPE = constants.HTYPE_CLUSTER
456
  _OP_REQP = ["skip_checks"]
457

    
458
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
459
                  remote_version, feedback_fn):
460
    """Run multiple tests against a node.
461

462
    Test list:
463
      - compares ganeti version
464
      - checks vg existance and size > 20G
465
      - checks config file checksum
466
      - checks ssh to other nodes
467

468
    Args:
469
      node: name of the node to check
470
      file_list: required list of files
471
      local_cksum: dictionary of local files and their checksums
472

473
    """
474
    # compares ganeti version
475
    local_version = constants.PROTOCOL_VERSION
476
    if not remote_version:
477
      feedback_fn("  - ERROR: connection to %s failed" % (node))
478
      return True
479

    
480
    if local_version != remote_version:
481
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
482
                      (local_version, node, remote_version))
483
      return True
484

    
485
    # checks vg existance and size > 20G
486

    
487
    bad = False
488
    if not vglist:
489
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
490
                      (node,))
491
      bad = True
492
    else:
493
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
494
                                            constants.MIN_VG_SIZE)
495
      if vgstatus:
496
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
497
        bad = True
498

    
499
    # checks config file checksum
500
    # checks ssh to any
501

    
502
    if 'filelist' not in node_result:
503
      bad = True
504
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
505
    else:
506
      remote_cksum = node_result['filelist']
507
      for file_name in file_list:
508
        if file_name not in remote_cksum:
509
          bad = True
510
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
511
        elif remote_cksum[file_name] != local_cksum[file_name]:
512
          bad = True
513
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
514

    
515
    if 'nodelist' not in node_result:
516
      bad = True
517
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
518
    else:
519
      if node_result['nodelist']:
520
        bad = True
521
        for node in node_result['nodelist']:
522
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
523
                          (node, node_result['nodelist'][node]))
524
    if 'node-net-test' not in node_result:
525
      bad = True
526
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
527
    else:
528
      if node_result['node-net-test']:
529
        bad = True
530
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
531
        for node in nlist:
532
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
533
                          (node, node_result['node-net-test'][node]))
534

    
535
    hyp_result = node_result.get('hypervisor', None)
536
    if hyp_result is not None:
537
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
538
    return bad
539

    
540
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
541
                      node_instance, feedback_fn):
542
    """Verify an instance.
543

544
    This function checks to see if the required block devices are
545
    available on the instance's node.
546

547
    """
548
    bad = False
549

    
550
    node_current = instanceconfig.primary_node
551

    
552
    node_vol_should = {}
553
    instanceconfig.MapLVsByNode(node_vol_should)
554

    
555
    for node in node_vol_should:
556
      for volume in node_vol_should[node]:
557
        if node not in node_vol_is or volume not in node_vol_is[node]:
558
          feedback_fn("  - ERROR: volume %s missing on node %s" %
559
                          (volume, node))
560
          bad = True
561

    
562
    if not instanceconfig.status == 'down':
563
      if (node_current not in node_instance or
564
          not instance in node_instance[node_current]):
565
        feedback_fn("  - ERROR: instance %s not running on node %s" %
566
                        (instance, node_current))
567
        bad = True
568

    
569
    for node in node_instance:
570
      if (not node == node_current):
571
        if instance in node_instance[node]:
572
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
573
                          (instance, node))
574
          bad = True
575

    
576
    return bad
577

    
578
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
579
    """Verify if there are any unknown volumes in the cluster.
580

581
    The .os, .swap and backup volumes are ignored. All other volumes are
582
    reported as unknown.
583

584
    """
585
    bad = False
586

    
587
    for node in node_vol_is:
588
      for volume in node_vol_is[node]:
589
        if node not in node_vol_should or volume not in node_vol_should[node]:
590
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
591
                      (volume, node))
592
          bad = True
593
    return bad
594

    
595
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
596
    """Verify the list of running instances.
597

598
    This checks what instances are running but unknown to the cluster.
599

600
    """
601
    bad = False
602
    for node in node_instance:
603
      for runninginstance in node_instance[node]:
604
        if runninginstance not in instancelist:
605
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
606
                          (runninginstance, node))
607
          bad = True
608
    return bad
609

    
610
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
611
    """Verify N+1 Memory Resilience.
612

613
    Check that if one single node dies we can still start all the instances it
614
    was primary for.
615

616
    """
617
    bad = False
618

    
619
    for node, nodeinfo in node_info.iteritems():
620
      # This code checks that every node which is now listed as secondary has
621
      # enough memory to host all instances it is supposed to should a single
622
      # other node in the cluster fail.
623
      # FIXME: not ready for failover to an arbitrary node
624
      # FIXME: does not support file-backed instances
625
      # WARNING: we currently take into account down instances as well as up
626
      # ones, considering that even if they're down someone might want to start
627
      # them even in the event of a node failure.
628
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
629
        needed_mem = 0
630
        for instance in instances:
631
          needed_mem += instance_cfg[instance].memory
632
        if nodeinfo['mfree'] < needed_mem:
633
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
634
                      " failovers should node %s fail" % (node, prinode))
635
          bad = True
636
    return bad
637

    
638
  def CheckPrereq(self):
639
    """Check prerequisites.
640

641
    Transform the list of checks we're going to skip into a set and check that
642
    all its members are valid.
643

644
    """
645
    self.skip_set = frozenset(self.op.skip_checks)
646
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
647
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
648

    
649
  def BuildHooksEnv(self):
650
    """Build hooks env.
651

652
    Cluster-Verify hooks just rone in the post phase and their failure makes
653
    the output be logged in the verify output and the verification to fail.
654

655
    """
656
    all_nodes = self.cfg.GetNodeList()
657
    # TODO: populate the environment with useful information for verify hooks
658
    env = {}
659
    return env, [], all_nodes
660

    
661
  def Exec(self, feedback_fn):
662
    """Verify integrity of cluster, performing various test on nodes.
663

664
    """
665
    bad = False
666
    feedback_fn("* Verifying global settings")
667
    for msg in self.cfg.VerifyConfig():
668
      feedback_fn("  - ERROR: %s" % msg)
669

    
670
    vg_name = self.cfg.GetVGName()
671
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
672
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
673
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
674
    i_non_redundant = [] # Non redundant instances
675
    node_volume = {}
676
    node_instance = {}
677
    node_info = {}
678
    instance_cfg = {}
679

    
680
    # FIXME: verify OS list
681
    # do local checksums
682
    file_names = list(self.sstore.GetFileList())
683
    file_names.append(constants.SSL_CERT_FILE)
684
    file_names.append(constants.CLUSTER_CONF_FILE)
685
    local_checksums = utils.FingerprintFiles(file_names)
686

    
687
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
688
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
689
    all_instanceinfo = rpc.call_instance_list(nodelist)
690
    all_vglist = rpc.call_vg_list(nodelist)
691
    node_verify_param = {
692
      'filelist': file_names,
693
      'nodelist': nodelist,
694
      'hypervisor': None,
695
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
696
                        for node in nodeinfo]
697
      }
698
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
699
    all_rversion = rpc.call_version(nodelist)
700
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
701

    
702
    for node in nodelist:
703
      feedback_fn("* Verifying node %s" % node)
704
      result = self._VerifyNode(node, file_names, local_checksums,
705
                                all_vglist[node], all_nvinfo[node],
706
                                all_rversion[node], feedback_fn)
707
      bad = bad or result
708

    
709
      # node_volume
710
      volumeinfo = all_volumeinfo[node]
711

    
712
      if isinstance(volumeinfo, basestring):
713
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
714
                    (node, volumeinfo[-400:].encode('string_escape')))
715
        bad = True
716
        node_volume[node] = {}
717
      elif not isinstance(volumeinfo, dict):
718
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
719
        bad = True
720
        continue
721
      else:
722
        node_volume[node] = volumeinfo
723

    
724
      # node_instance
725
      nodeinstance = all_instanceinfo[node]
726
      if type(nodeinstance) != list:
727
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
728
        bad = True
729
        continue
730

    
731
      node_instance[node] = nodeinstance
732

    
733
      # node_info
734
      nodeinfo = all_ninfo[node]
735
      if not isinstance(nodeinfo, dict):
736
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
737
        bad = True
738
        continue
739

    
740
      try:
741
        node_info[node] = {
742
          "mfree": int(nodeinfo['memory_free']),
743
          "dfree": int(nodeinfo['vg_free']),
744
          "pinst": [],
745
          "sinst": [],
746
          # dictionary holding all instances this node is secondary for,
747
          # grouped by their primary node. Each key is a cluster node, and each
748
          # value is a list of instances which have the key as primary and the
749
          # current node as secondary.  this is handy to calculate N+1 memory
750
          # availability if you can only failover from a primary to its
751
          # secondary.
752
          "sinst-by-pnode": {},
753
        }
754
      except ValueError:
755
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
756
        bad = True
757
        continue
758

    
759
    node_vol_should = {}
760

    
761
    for instance in instancelist:
762
      feedback_fn("* Verifying instance %s" % instance)
763
      inst_config = self.cfg.GetInstanceInfo(instance)
764
      result =  self._VerifyInstance(instance, inst_config, node_volume,
765
                                     node_instance, feedback_fn)
766
      bad = bad or result
767

    
768
      inst_config.MapLVsByNode(node_vol_should)
769

    
770
      instance_cfg[instance] = inst_config
771

    
772
      pnode = inst_config.primary_node
773
      if pnode in node_info:
774
        node_info[pnode]['pinst'].append(instance)
775
      else:
776
        feedback_fn("  - ERROR: instance %s, connection to primary node"
777
                    " %s failed" % (instance, pnode))
778
        bad = True
779

    
780
      # If the instance is non-redundant we cannot survive losing its primary
781
      # node, so we are not N+1 compliant. On the other hand we have no disk
782
      # templates with more than one secondary so that situation is not well
783
      # supported either.
784
      # FIXME: does not support file-backed instances
785
      if len(inst_config.secondary_nodes) == 0:
786
        i_non_redundant.append(instance)
787
      elif len(inst_config.secondary_nodes) > 1:
788
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
789
                    % instance)
790

    
791
      for snode in inst_config.secondary_nodes:
792
        if snode in node_info:
793
          node_info[snode]['sinst'].append(instance)
794
          if pnode not in node_info[snode]['sinst-by-pnode']:
795
            node_info[snode]['sinst-by-pnode'][pnode] = []
796
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
797
        else:
798
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
799
                      " %s failed" % (instance, snode))
800

    
801
    feedback_fn("* Verifying orphan volumes")
802
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
803
                                       feedback_fn)
804
    bad = bad or result
805

    
806
    feedback_fn("* Verifying remaining instances")
807
    result = self._VerifyOrphanInstances(instancelist, node_instance,
808
                                         feedback_fn)
809
    bad = bad or result
810

    
811
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
812
      feedback_fn("* Verifying N+1 Memory redundancy")
813
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
814
      bad = bad or result
815

    
816
    feedback_fn("* Other Notes")
817
    if i_non_redundant:
818
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
819
                  % len(i_non_redundant))
820

    
821
    return int(bad)
822

    
823
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
824
    """Analize the post-hooks' result, handle it, and send some
825
    nicely-formatted feedback back to the user.
826

827
    Args:
828
      phase: the hooks phase that has just been run
829
      hooks_results: the results of the multi-node hooks rpc call
830
      feedback_fn: function to send feedback back to the caller
831
      lu_result: previous Exec result
832

833
    """
834
    # We only really run POST phase hooks, and are only interested in their results
835
    if phase == constants.HOOKS_PHASE_POST:
836
      # Used to change hooks' output to proper indentation
837
      indent_re = re.compile('^', re.M)
838
      feedback_fn("* Hooks Results")
839
      if not hooks_results:
840
        feedback_fn("  - ERROR: general communication failure")
841
        lu_result = 1
842
      else:
843
        for node_name in hooks_results:
844
          show_node_header = True
845
          res = hooks_results[node_name]
846
          if res is False or not isinstance(res, list):
847
            feedback_fn("    Communication failure")
848
            lu_result = 1
849
            continue
850
          for script, hkr, output in res:
851
            if hkr == constants.HKR_FAIL:
852
              # The node header is only shown once, if there are
853
              # failing hooks on that node
854
              if show_node_header:
855
                feedback_fn("  Node %s:" % node_name)
856
                show_node_header = False
857
              feedback_fn("    ERROR: Script %s failed, output:" % script)
858
              output = indent_re.sub('      ', output)
859
              feedback_fn("%s" % output)
860
              lu_result = 1
861

    
862
      return lu_result
863

    
864

    
865
class LUVerifyDisks(NoHooksLU):
866
  """Verifies the cluster disks status.
867

868
  """
869
  _OP_REQP = []
870

    
871
  def CheckPrereq(self):
872
    """Check prerequisites.
873

874
    This has no prerequisites.
875

876
    """
877
    pass
878

    
879
  def Exec(self, feedback_fn):
880
    """Verify integrity of cluster disks.
881

882
    """
883
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
884

    
885
    vg_name = self.cfg.GetVGName()
886
    nodes = utils.NiceSort(self.cfg.GetNodeList())
887
    instances = [self.cfg.GetInstanceInfo(name)
888
                 for name in self.cfg.GetInstanceList()]
889

    
890
    nv_dict = {}
891
    for inst in instances:
892
      inst_lvs = {}
893
      if (inst.status != "up" or
894
          inst.disk_template not in constants.DTS_NET_MIRROR):
895
        continue
896
      inst.MapLVsByNode(inst_lvs)
897
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
898
      for node, vol_list in inst_lvs.iteritems():
899
        for vol in vol_list:
900
          nv_dict[(node, vol)] = inst
901

    
902
    if not nv_dict:
903
      return result
904

    
905
    node_lvs = rpc.call_volume_list(nodes, vg_name)
906

    
907
    to_act = set()
908
    for node in nodes:
909
      # node_volume
910
      lvs = node_lvs[node]
911

    
912
      if isinstance(lvs, basestring):
913
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
914
        res_nlvm[node] = lvs
915
      elif not isinstance(lvs, dict):
916
        logger.Info("connection to node %s failed or invalid data returned" %
917
                    (node,))
918
        res_nodes.append(node)
919
        continue
920

    
921
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
922
        inst = nv_dict.pop((node, lv_name), None)
923
        if (not lv_online and inst is not None
924
            and inst.name not in res_instances):
925
          res_instances.append(inst.name)
926

    
927
    # any leftover items in nv_dict are missing LVs, let's arrange the
928
    # data better
929
    for key, inst in nv_dict.iteritems():
930
      if inst.name not in res_missing:
931
        res_missing[inst.name] = []
932
      res_missing[inst.name].append(key)
933

    
934
    return result
935

    
936

    
937
class LURenameCluster(LogicalUnit):
938
  """Rename the cluster.
939

940
  """
941
  HPATH = "cluster-rename"
942
  HTYPE = constants.HTYPE_CLUSTER
943
  _OP_REQP = ["name"]
944
  REQ_WSSTORE = True
945

    
946
  def BuildHooksEnv(self):
947
    """Build hooks env.
948

949
    """
950
    env = {
951
      "OP_TARGET": self.sstore.GetClusterName(),
952
      "NEW_NAME": self.op.name,
953
      }
954
    mn = self.sstore.GetMasterNode()
955
    return env, [mn], [mn]
956

    
957
  def CheckPrereq(self):
958
    """Verify that the passed name is a valid one.
959

960
    """
961
    hostname = utils.HostInfo(self.op.name)
962

    
963
    new_name = hostname.name
964
    self.ip = new_ip = hostname.ip
965
    old_name = self.sstore.GetClusterName()
966
    old_ip = self.sstore.GetMasterIP()
967
    if new_name == old_name and new_ip == old_ip:
968
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
969
                                 " cluster has changed")
970
    if new_ip != old_ip:
971
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
972
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
973
                                   " reachable on the network. Aborting." %
974
                                   new_ip)
975

    
976
    self.op.name = new_name
977

    
978
  def Exec(self, feedback_fn):
979
    """Rename the cluster.
980

981
    """
982
    clustername = self.op.name
983
    ip = self.ip
984
    ss = self.sstore
985

    
986
    # shutdown the master IP
987
    master = ss.GetMasterNode()
988
    if not rpc.call_node_stop_master(master, False):
989
      raise errors.OpExecError("Could not disable the master role")
990

    
991
    try:
992
      # modify the sstore
993
      ss.SetKey(ss.SS_MASTER_IP, ip)
994
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
995

    
996
      # Distribute updated ss config to all nodes
997
      myself = self.cfg.GetNodeInfo(master)
998
      dist_nodes = self.cfg.GetNodeList()
999
      if myself.name in dist_nodes:
1000
        dist_nodes.remove(myself.name)
1001

    
1002
      logger.Debug("Copying updated ssconf data to all nodes")
1003
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1004
        fname = ss.KeyToFilename(keyname)
1005
        result = rpc.call_upload_file(dist_nodes, fname)
1006
        for to_node in dist_nodes:
1007
          if not result[to_node]:
1008
            logger.Error("copy of file %s to node %s failed" %
1009
                         (fname, to_node))
1010
    finally:
1011
      if not rpc.call_node_start_master(master, False):
1012
        logger.Error("Could not re-enable the master role on the master,"
1013
                     " please restart manually.")
1014

    
1015

    
1016
def _RecursiveCheckIfLVMBased(disk):
1017
  """Check if the given disk or its children are lvm-based.
1018

1019
  Args:
1020
    disk: ganeti.objects.Disk object
1021

1022
  Returns:
1023
    boolean indicating whether a LD_LV dev_type was found or not
1024

1025
  """
1026
  if disk.children:
1027
    for chdisk in disk.children:
1028
      if _RecursiveCheckIfLVMBased(chdisk):
1029
        return True
1030
  return disk.dev_type == constants.LD_LV
1031

    
1032

    
1033
class LUSetClusterParams(LogicalUnit):
1034
  """Change the parameters of the cluster.
1035

1036
  """
1037
  HPATH = "cluster-modify"
1038
  HTYPE = constants.HTYPE_CLUSTER
1039
  _OP_REQP = []
1040

    
1041
  def BuildHooksEnv(self):
1042
    """Build hooks env.
1043

1044
    """
1045
    env = {
1046
      "OP_TARGET": self.sstore.GetClusterName(),
1047
      "NEW_VG_NAME": self.op.vg_name,
1048
      }
1049
    mn = self.sstore.GetMasterNode()
1050
    return env, [mn], [mn]
1051

    
1052
  def CheckPrereq(self):
1053
    """Check prerequisites.
1054

1055
    This checks whether the given params don't conflict and
1056
    if the given volume group is valid.
1057

1058
    """
1059
    if not self.op.vg_name:
1060
      instances = [self.cfg.GetInstanceInfo(name)
1061
                   for name in self.cfg.GetInstanceList()]
1062
      for inst in instances:
1063
        for disk in inst.disks:
1064
          if _RecursiveCheckIfLVMBased(disk):
1065
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1066
                                       " lvm-based instances exist")
1067

    
1068
    # if vg_name not None, checks given volume group on all nodes
1069
    if self.op.vg_name:
1070
      node_list = self.cfg.GetNodeList()
1071
      vglist = rpc.call_vg_list(node_list)
1072
      for node in node_list:
1073
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1074
                                              constants.MIN_VG_SIZE)
1075
        if vgstatus:
1076
          raise errors.OpPrereqError("Error on node '%s': %s" %
1077
                                     (node, vgstatus))
1078

    
1079
  def Exec(self, feedback_fn):
1080
    """Change the parameters of the cluster.
1081

1082
    """
1083
    if self.op.vg_name != self.cfg.GetVGName():
1084
      self.cfg.SetVGName(self.op.vg_name)
1085
    else:
1086
      feedback_fn("Cluster LVM configuration already in desired"
1087
                  " state, not changing")
1088

    
1089

    
1090
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1091
  """Sleep and poll for an instance's disk to sync.
1092

1093
  """
1094
  if not instance.disks:
1095
    return True
1096

    
1097
  if not oneshot:
1098
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1099

    
1100
  node = instance.primary_node
1101

    
1102
  for dev in instance.disks:
1103
    cfgw.SetDiskID(dev, node)
1104

    
1105
  retries = 0
1106
  while True:
1107
    max_time = 0
1108
    done = True
1109
    cumul_degraded = False
1110
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1111
    if not rstats:
1112
      proc.LogWarning("Can't get any data from node %s" % node)
1113
      retries += 1
1114
      if retries >= 10:
1115
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1116
                                 " aborting." % node)
1117
      time.sleep(6)
1118
      continue
1119
    retries = 0
1120
    for i in range(len(rstats)):
1121
      mstat = rstats[i]
1122
      if mstat is None:
1123
        proc.LogWarning("Can't compute data for node %s/%s" %
1124
                        (node, instance.disks[i].iv_name))
1125
        continue
1126
      # we ignore the ldisk parameter
1127
      perc_done, est_time, is_degraded, _ = mstat
1128
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1129
      if perc_done is not None:
1130
        done = False
1131
        if est_time is not None:
1132
          rem_time = "%d estimated seconds remaining" % est_time
1133
          max_time = est_time
1134
        else:
1135
          rem_time = "no time estimate"
1136
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1137
                     (instance.disks[i].iv_name, perc_done, rem_time))
1138
    if done or oneshot:
1139
      break
1140

    
1141
    time.sleep(min(60, max_time))
1142

    
1143
  if done:
1144
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1145
  return not cumul_degraded
1146

    
1147

    
1148
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1149
  """Check that mirrors are not degraded.
1150

1151
  The ldisk parameter, if True, will change the test from the
1152
  is_degraded attribute (which represents overall non-ok status for
1153
  the device(s)) to the ldisk (representing the local storage status).
1154

1155
  """
1156
  cfgw.SetDiskID(dev, node)
1157
  if ldisk:
1158
    idx = 6
1159
  else:
1160
    idx = 5
1161

    
1162
  result = True
1163
  if on_primary or dev.AssembleOnSecondary():
1164
    rstats = rpc.call_blockdev_find(node, dev)
1165
    if not rstats:
1166
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1167
      result = False
1168
    else:
1169
      result = result and (not rstats[idx])
1170
  if dev.children:
1171
    for child in dev.children:
1172
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1173

    
1174
  return result
1175

    
1176

    
1177
class LUDiagnoseOS(NoHooksLU):
1178
  """Logical unit for OS diagnose/query.
1179

1180
  """
1181
  _OP_REQP = ["output_fields", "names"]
1182

    
1183
  def CheckPrereq(self):
1184
    """Check prerequisites.
1185

1186
    This always succeeds, since this is a pure query LU.
1187

1188
    """
1189
    if self.op.names:
1190
      raise errors.OpPrereqError("Selective OS query not supported")
1191

    
1192
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1193
    _CheckOutputFields(static=[],
1194
                       dynamic=self.dynamic_fields,
1195
                       selected=self.op.output_fields)
1196

    
1197
  @staticmethod
1198
  def _DiagnoseByOS(node_list, rlist):
1199
    """Remaps a per-node return list into an a per-os per-node dictionary
1200

1201
      Args:
1202
        node_list: a list with the names of all nodes
1203
        rlist: a map with node names as keys and OS objects as values
1204

1205
      Returns:
1206
        map: a map with osnames as keys and as value another map, with
1207
             nodes as
1208
             keys and list of OS objects as values
1209
             e.g. {"debian-etch": {"node1": [<object>,...],
1210
                                   "node2": [<object>,]}
1211
                  }
1212

1213
    """
1214
    all_os = {}
1215
    for node_name, nr in rlist.iteritems():
1216
      if not nr:
1217
        continue
1218
      for os_obj in nr:
1219
        if os_obj.name not in all_os:
1220
          # build a list of nodes for this os containing empty lists
1221
          # for each node in node_list
1222
          all_os[os_obj.name] = {}
1223
          for nname in node_list:
1224
            all_os[os_obj.name][nname] = []
1225
        all_os[os_obj.name][node_name].append(os_obj)
1226
    return all_os
1227

    
1228
  def Exec(self, feedback_fn):
1229
    """Compute the list of OSes.
1230

1231
    """
1232
    node_list = self.cfg.GetNodeList()
1233
    node_data = rpc.call_os_diagnose(node_list)
1234
    if node_data == False:
1235
      raise errors.OpExecError("Can't gather the list of OSes")
1236
    pol = self._DiagnoseByOS(node_list, node_data)
1237
    output = []
1238
    for os_name, os_data in pol.iteritems():
1239
      row = []
1240
      for field in self.op.output_fields:
1241
        if field == "name":
1242
          val = os_name
1243
        elif field == "valid":
1244
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1245
        elif field == "node_status":
1246
          val = {}
1247
          for node_name, nos_list in os_data.iteritems():
1248
            val[node_name] = [(v.status, v.path) for v in nos_list]
1249
        else:
1250
          raise errors.ParameterError(field)
1251
        row.append(val)
1252
      output.append(row)
1253

    
1254
    return output
1255

    
1256

    
1257
class LURemoveNode(LogicalUnit):
1258
  """Logical unit for removing a node.
1259

1260
  """
1261
  HPATH = "node-remove"
1262
  HTYPE = constants.HTYPE_NODE
1263
  _OP_REQP = ["node_name"]
1264

    
1265
  def BuildHooksEnv(self):
1266
    """Build hooks env.
1267

1268
    This doesn't run on the target node in the pre phase as a failed
1269
    node would then be impossible to remove.
1270

1271
    """
1272
    env = {
1273
      "OP_TARGET": self.op.node_name,
1274
      "NODE_NAME": self.op.node_name,
1275
      }
1276
    all_nodes = self.cfg.GetNodeList()
1277
    all_nodes.remove(self.op.node_name)
1278
    return env, all_nodes, all_nodes
1279

    
1280
  def CheckPrereq(self):
1281
    """Check prerequisites.
1282

1283
    This checks:
1284
     - the node exists in the configuration
1285
     - it does not have primary or secondary instances
1286
     - it's not the master
1287

1288
    Any errors are signalled by raising errors.OpPrereqError.
1289

1290
    """
1291
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1292
    if node is None:
1293
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1294

    
1295
    instance_list = self.cfg.GetInstanceList()
1296

    
1297
    masternode = self.sstore.GetMasterNode()
1298
    if node.name == masternode:
1299
      raise errors.OpPrereqError("Node is the master node,"
1300
                                 " you need to failover first.")
1301

    
1302
    for instance_name in instance_list:
1303
      instance = self.cfg.GetInstanceInfo(instance_name)
1304
      if node.name == instance.primary_node:
1305
        raise errors.OpPrereqError("Instance %s still running on the node,"
1306
                                   " please remove first." % instance_name)
1307
      if node.name in instance.secondary_nodes:
1308
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1309
                                   " please remove first." % instance_name)
1310
    self.op.node_name = node.name
1311
    self.node = node
1312

    
1313
  def Exec(self, feedback_fn):
1314
    """Removes the node from the cluster.
1315

1316
    """
1317
    node = self.node
1318
    logger.Info("stopping the node daemon and removing configs from node %s" %
1319
                node.name)
1320

    
1321
    rpc.call_node_leave_cluster(node.name)
1322

    
1323
    logger.Info("Removing node %s from config" % node.name)
1324

    
1325
    self.cfg.RemoveNode(node.name)
1326
    # Remove the node from the Ganeti Lock Manager
1327
    self.context.glm.remove(locking.LEVEL_NODE, node.name)
1328

    
1329
    utils.RemoveHostFromEtcHosts(node.name)
1330

    
1331

    
1332
class LUQueryNodes(NoHooksLU):
1333
  """Logical unit for querying nodes.
1334

1335
  """
1336
  _OP_REQP = ["output_fields", "names"]
1337

    
1338
  def CheckPrereq(self):
1339
    """Check prerequisites.
1340

1341
    This checks that the fields required are valid output fields.
1342

1343
    """
1344
    self.dynamic_fields = frozenset([
1345
      "dtotal", "dfree",
1346
      "mtotal", "mnode", "mfree",
1347
      "bootid",
1348
      "ctotal",
1349
      ])
1350

    
1351
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1352
                               "pinst_list", "sinst_list",
1353
                               "pip", "sip", "tags"],
1354
                       dynamic=self.dynamic_fields,
1355
                       selected=self.op.output_fields)
1356

    
1357
    self.wanted = _GetWantedNodes(self, self.op.names)
1358

    
1359
  def Exec(self, feedback_fn):
1360
    """Computes the list of nodes and their attributes.
1361

1362
    """
1363
    nodenames = self.wanted
1364
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1365

    
1366
    # begin data gathering
1367

    
1368
    if self.dynamic_fields.intersection(self.op.output_fields):
1369
      live_data = {}
1370
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1371
      for name in nodenames:
1372
        nodeinfo = node_data.get(name, None)
1373
        if nodeinfo:
1374
          live_data[name] = {
1375
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1376
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1377
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1378
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1379
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1380
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1381
            "bootid": nodeinfo['bootid'],
1382
            }
1383
        else:
1384
          live_data[name] = {}
1385
    else:
1386
      live_data = dict.fromkeys(nodenames, {})
1387

    
1388
    node_to_primary = dict([(name, set()) for name in nodenames])
1389
    node_to_secondary = dict([(name, set()) for name in nodenames])
1390

    
1391
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1392
                             "sinst_cnt", "sinst_list"))
1393
    if inst_fields & frozenset(self.op.output_fields):
1394
      instancelist = self.cfg.GetInstanceList()
1395

    
1396
      for instance_name in instancelist:
1397
        inst = self.cfg.GetInstanceInfo(instance_name)
1398
        if inst.primary_node in node_to_primary:
1399
          node_to_primary[inst.primary_node].add(inst.name)
1400
        for secnode in inst.secondary_nodes:
1401
          if secnode in node_to_secondary:
1402
            node_to_secondary[secnode].add(inst.name)
1403

    
1404
    # end data gathering
1405

    
1406
    output = []
1407
    for node in nodelist:
1408
      node_output = []
1409
      for field in self.op.output_fields:
1410
        if field == "name":
1411
          val = node.name
1412
        elif field == "pinst_list":
1413
          val = list(node_to_primary[node.name])
1414
        elif field == "sinst_list":
1415
          val = list(node_to_secondary[node.name])
1416
        elif field == "pinst_cnt":
1417
          val = len(node_to_primary[node.name])
1418
        elif field == "sinst_cnt":
1419
          val = len(node_to_secondary[node.name])
1420
        elif field == "pip":
1421
          val = node.primary_ip
1422
        elif field == "sip":
1423
          val = node.secondary_ip
1424
        elif field == "tags":
1425
          val = list(node.GetTags())
1426
        elif field in self.dynamic_fields:
1427
          val = live_data[node.name].get(field, None)
1428
        else:
1429
          raise errors.ParameterError(field)
1430
        node_output.append(val)
1431
      output.append(node_output)
1432

    
1433
    return output
1434

    
1435

    
1436
class LUQueryNodeVolumes(NoHooksLU):
1437
  """Logical unit for getting volumes on node(s).
1438

1439
  """
1440
  _OP_REQP = ["nodes", "output_fields"]
1441

    
1442
  def CheckPrereq(self):
1443
    """Check prerequisites.
1444

1445
    This checks that the fields required are valid output fields.
1446

1447
    """
1448
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1449

    
1450
    _CheckOutputFields(static=["node"],
1451
                       dynamic=["phys", "vg", "name", "size", "instance"],
1452
                       selected=self.op.output_fields)
1453

    
1454

    
1455
  def Exec(self, feedback_fn):
1456
    """Computes the list of nodes and their attributes.
1457

1458
    """
1459
    nodenames = self.nodes
1460
    volumes = rpc.call_node_volumes(nodenames)
1461

    
1462
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1463
             in self.cfg.GetInstanceList()]
1464

    
1465
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1466

    
1467
    output = []
1468
    for node in nodenames:
1469
      if node not in volumes or not volumes[node]:
1470
        continue
1471

    
1472
      node_vols = volumes[node][:]
1473
      node_vols.sort(key=lambda vol: vol['dev'])
1474

    
1475
      for vol in node_vols:
1476
        node_output = []
1477
        for field in self.op.output_fields:
1478
          if field == "node":
1479
            val = node
1480
          elif field == "phys":
1481
            val = vol['dev']
1482
          elif field == "vg":
1483
            val = vol['vg']
1484
          elif field == "name":
1485
            val = vol['name']
1486
          elif field == "size":
1487
            val = int(float(vol['size']))
1488
          elif field == "instance":
1489
            for inst in ilist:
1490
              if node not in lv_by_node[inst]:
1491
                continue
1492
              if vol['name'] in lv_by_node[inst][node]:
1493
                val = inst.name
1494
                break
1495
            else:
1496
              val = '-'
1497
          else:
1498
            raise errors.ParameterError(field)
1499
          node_output.append(str(val))
1500

    
1501
        output.append(node_output)
1502

    
1503
    return output
1504

    
1505

    
1506
class LUAddNode(LogicalUnit):
1507
  """Logical unit for adding node to the cluster.
1508

1509
  """
1510
  HPATH = "node-add"
1511
  HTYPE = constants.HTYPE_NODE
1512
  _OP_REQP = ["node_name"]
1513

    
1514
  def BuildHooksEnv(self):
1515
    """Build hooks env.
1516

1517
    This will run on all nodes before, and on all nodes + the new node after.
1518

1519
    """
1520
    env = {
1521
      "OP_TARGET": self.op.node_name,
1522
      "NODE_NAME": self.op.node_name,
1523
      "NODE_PIP": self.op.primary_ip,
1524
      "NODE_SIP": self.op.secondary_ip,
1525
      }
1526
    nodes_0 = self.cfg.GetNodeList()
1527
    nodes_1 = nodes_0 + [self.op.node_name, ]
1528
    return env, nodes_0, nodes_1
1529

    
1530
  def CheckPrereq(self):
1531
    """Check prerequisites.
1532

1533
    This checks:
1534
     - the new node is not already in the config
1535
     - it is resolvable
1536
     - its parameters (single/dual homed) matches the cluster
1537

1538
    Any errors are signalled by raising errors.OpPrereqError.
1539

1540
    """
1541
    node_name = self.op.node_name
1542
    cfg = self.cfg
1543

    
1544
    dns_data = utils.HostInfo(node_name)
1545

    
1546
    node = dns_data.name
1547
    primary_ip = self.op.primary_ip = dns_data.ip
1548
    secondary_ip = getattr(self.op, "secondary_ip", None)
1549
    if secondary_ip is None:
1550
      secondary_ip = primary_ip
1551
    if not utils.IsValidIP(secondary_ip):
1552
      raise errors.OpPrereqError("Invalid secondary IP given")
1553
    self.op.secondary_ip = secondary_ip
1554

    
1555
    node_list = cfg.GetNodeList()
1556
    if not self.op.readd and node in node_list:
1557
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1558
                                 node)
1559
    elif self.op.readd and node not in node_list:
1560
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1561

    
1562
    for existing_node_name in node_list:
1563
      existing_node = cfg.GetNodeInfo(existing_node_name)
1564

    
1565
      if self.op.readd and node == existing_node_name:
1566
        if (existing_node.primary_ip != primary_ip or
1567
            existing_node.secondary_ip != secondary_ip):
1568
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1569
                                     " address configuration as before")
1570
        continue
1571

    
1572
      if (existing_node.primary_ip == primary_ip or
1573
          existing_node.secondary_ip == primary_ip or
1574
          existing_node.primary_ip == secondary_ip or
1575
          existing_node.secondary_ip == secondary_ip):
1576
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1577
                                   " existing node %s" % existing_node.name)
1578

    
1579
    # check that the type of the node (single versus dual homed) is the
1580
    # same as for the master
1581
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1582
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1583
    newbie_singlehomed = secondary_ip == primary_ip
1584
    if master_singlehomed != newbie_singlehomed:
1585
      if master_singlehomed:
1586
        raise errors.OpPrereqError("The master has no private ip but the"
1587
                                   " new node has one")
1588
      else:
1589
        raise errors.OpPrereqError("The master has a private ip but the"
1590
                                   " new node doesn't have one")
1591

    
1592
    # checks reachablity
1593
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1594
      raise errors.OpPrereqError("Node not reachable by ping")
1595

    
1596
    if not newbie_singlehomed:
1597
      # check reachability from my secondary ip to newbie's secondary ip
1598
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1599
                           source=myself.secondary_ip):
1600
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1601
                                   " based ping to noded port")
1602

    
1603
    self.new_node = objects.Node(name=node,
1604
                                 primary_ip=primary_ip,
1605
                                 secondary_ip=secondary_ip)
1606

    
1607
  def Exec(self, feedback_fn):
1608
    """Adds the new node to the cluster.
1609

1610
    """
1611
    new_node = self.new_node
1612
    node = new_node.name
1613

    
1614
    # check connectivity
1615
    result = rpc.call_version([node])[node]
1616
    if result:
1617
      if constants.PROTOCOL_VERSION == result:
1618
        logger.Info("communication to node %s fine, sw version %s match" %
1619
                    (node, result))
1620
      else:
1621
        raise errors.OpExecError("Version mismatch master version %s,"
1622
                                 " node version %s" %
1623
                                 (constants.PROTOCOL_VERSION, result))
1624
    else:
1625
      raise errors.OpExecError("Cannot get version from the new node")
1626

    
1627
    # setup ssh on node
1628
    logger.Info("copy ssh key to node %s" % node)
1629
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1630
    keyarray = []
1631
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1632
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1633
                priv_key, pub_key]
1634

    
1635
    for i in keyfiles:
1636
      f = open(i, 'r')
1637
      try:
1638
        keyarray.append(f.read())
1639
      finally:
1640
        f.close()
1641

    
1642
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1643
                               keyarray[3], keyarray[4], keyarray[5])
1644

    
1645
    if not result:
1646
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1647

    
1648
    # Add node to our /etc/hosts, and add key to known_hosts
1649
    utils.AddHostToEtcHosts(new_node.name)
1650

    
1651
    if new_node.secondary_ip != new_node.primary_ip:
1652
      if not rpc.call_node_tcp_ping(new_node.name,
1653
                                    constants.LOCALHOST_IP_ADDRESS,
1654
                                    new_node.secondary_ip,
1655
                                    constants.DEFAULT_NODED_PORT,
1656
                                    10, False):
1657
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1658
                                 " you gave (%s). Please fix and re-run this"
1659
                                 " command." % new_node.secondary_ip)
1660

    
1661
    node_verify_list = [self.sstore.GetMasterNode()]
1662
    node_verify_param = {
1663
      'nodelist': [node],
1664
      # TODO: do a node-net-test as well?
1665
    }
1666

    
1667
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1668
    for verifier in node_verify_list:
1669
      if not result[verifier]:
1670
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1671
                                 " for remote verification" % verifier)
1672
      if result[verifier]['nodelist']:
1673
        for failed in result[verifier]['nodelist']:
1674
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1675
                      (verifier, result[verifier]['nodelist'][failed]))
1676
        raise errors.OpExecError("ssh/hostname verification failed.")
1677

    
1678
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1679
    # including the node just added
1680
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1681
    dist_nodes = self.cfg.GetNodeList()
1682
    if not self.op.readd:
1683
      dist_nodes.append(node)
1684
    if myself.name in dist_nodes:
1685
      dist_nodes.remove(myself.name)
1686

    
1687
    logger.Debug("Copying hosts and known_hosts to all nodes")
1688
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1689
      result = rpc.call_upload_file(dist_nodes, fname)
1690
      for to_node in dist_nodes:
1691
        if not result[to_node]:
1692
          logger.Error("copy of file %s to node %s failed" %
1693
                       (fname, to_node))
1694

    
1695
    to_copy = self.sstore.GetFileList()
1696
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1697
      to_copy.append(constants.VNC_PASSWORD_FILE)
1698
    for fname in to_copy:
1699
      result = rpc.call_upload_file([node], fname)
1700
      if not result[node]:
1701
        logger.Error("could not copy file %s to node %s" % (fname, node))
1702

    
1703
    if not self.op.readd:
1704
      logger.Info("adding node %s to cluster.conf" % node)
1705
      self.cfg.AddNode(new_node)
1706
      # Add the new node to the Ganeti Lock Manager
1707
      self.context.glm.add(locking.LEVEL_NODE, node)
1708

    
1709

    
1710
class LUQueryClusterInfo(NoHooksLU):
1711
  """Query cluster configuration.
1712

1713
  """
1714
  _OP_REQP = []
1715
  REQ_MASTER = False
1716
  REQ_BGL = False
1717

    
1718
  def ExpandNames(self):
1719
    self.needed_locks = {}
1720

    
1721
  def CheckPrereq(self):
1722
    """No prerequsites needed for this LU.
1723

1724
    """
1725
    pass
1726

    
1727
  def Exec(self, feedback_fn):
1728
    """Return cluster config.
1729

1730
    """
1731
    result = {
1732
      "name": self.sstore.GetClusterName(),
1733
      "software_version": constants.RELEASE_VERSION,
1734
      "protocol_version": constants.PROTOCOL_VERSION,
1735
      "config_version": constants.CONFIG_VERSION,
1736
      "os_api_version": constants.OS_API_VERSION,
1737
      "export_version": constants.EXPORT_VERSION,
1738
      "master": self.sstore.GetMasterNode(),
1739
      "architecture": (platform.architecture()[0], platform.machine()),
1740
      "hypervisor_type": self.sstore.GetHypervisorType(),
1741
      }
1742

    
1743
    return result
1744

    
1745

    
1746
class LUDumpClusterConfig(NoHooksLU):
1747
  """Return a text-representation of the cluster-config.
1748

1749
  """
1750
  _OP_REQP = []
1751
  REQ_BGL = False
1752

    
1753
  def ExpandNames(self):
1754
    self.needed_locks = {}
1755

    
1756
  def CheckPrereq(self):
1757
    """No prerequisites.
1758

1759
    """
1760
    pass
1761

    
1762
  def Exec(self, feedback_fn):
1763
    """Dump a representation of the cluster config to the standard output.
1764

1765
    """
1766
    return self.cfg.DumpConfig()
1767

    
1768

    
1769
class LUActivateInstanceDisks(NoHooksLU):
1770
  """Bring up an instance's disks.
1771

1772
  """
1773
  _OP_REQP = ["instance_name"]
1774

    
1775
  def CheckPrereq(self):
1776
    """Check prerequisites.
1777

1778
    This checks that the instance is in the cluster.
1779

1780
    """
1781
    instance = self.cfg.GetInstanceInfo(
1782
      self.cfg.ExpandInstanceName(self.op.instance_name))
1783
    if instance is None:
1784
      raise errors.OpPrereqError("Instance '%s' not known" %
1785
                                 self.op.instance_name)
1786
    self.instance = instance
1787

    
1788

    
1789
  def Exec(self, feedback_fn):
1790
    """Activate the disks.
1791

1792
    """
1793
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1794
    if not disks_ok:
1795
      raise errors.OpExecError("Cannot activate block devices")
1796

    
1797
    return disks_info
1798

    
1799

    
1800
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1801
  """Prepare the block devices for an instance.
1802

1803
  This sets up the block devices on all nodes.
1804

1805
  Args:
1806
    instance: a ganeti.objects.Instance object
1807
    ignore_secondaries: if true, errors on secondary nodes won't result
1808
                        in an error return from the function
1809

1810
  Returns:
1811
    false if the operation failed
1812
    list of (host, instance_visible_name, node_visible_name) if the operation
1813
         suceeded with the mapping from node devices to instance devices
1814
  """
1815
  device_info = []
1816
  disks_ok = True
1817
  iname = instance.name
1818
  # With the two passes mechanism we try to reduce the window of
1819
  # opportunity for the race condition of switching DRBD to primary
1820
  # before handshaking occured, but we do not eliminate it
1821

    
1822
  # The proper fix would be to wait (with some limits) until the
1823
  # connection has been made and drbd transitions from WFConnection
1824
  # into any other network-connected state (Connected, SyncTarget,
1825
  # SyncSource, etc.)
1826

    
1827
  # 1st pass, assemble on all nodes in secondary mode
1828
  for inst_disk in instance.disks:
1829
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1830
      cfg.SetDiskID(node_disk, node)
1831
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1832
      if not result:
1833
        logger.Error("could not prepare block device %s on node %s"
1834
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1835
        if not ignore_secondaries:
1836
          disks_ok = False
1837

    
1838
  # FIXME: race condition on drbd migration to primary
1839

    
1840
  # 2nd pass, do only the primary node
1841
  for inst_disk in instance.disks:
1842
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1843
      if node != instance.primary_node:
1844
        continue
1845
      cfg.SetDiskID(node_disk, node)
1846
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1847
      if not result:
1848
        logger.Error("could not prepare block device %s on node %s"
1849
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1850
        disks_ok = False
1851
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1852

    
1853
  # leave the disks configured for the primary node
1854
  # this is a workaround that would be fixed better by
1855
  # improving the logical/physical id handling
1856
  for disk in instance.disks:
1857
    cfg.SetDiskID(disk, instance.primary_node)
1858

    
1859
  return disks_ok, device_info
1860

    
1861

    
1862
def _StartInstanceDisks(cfg, instance, force):
1863
  """Start the disks of an instance.
1864

1865
  """
1866
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1867
                                           ignore_secondaries=force)
1868
  if not disks_ok:
1869
    _ShutdownInstanceDisks(instance, cfg)
1870
    if force is not None and not force:
1871
      logger.Error("If the message above refers to a secondary node,"
1872
                   " you can retry the operation using '--force'.")
1873
    raise errors.OpExecError("Disk consistency error")
1874

    
1875

    
1876
class LUDeactivateInstanceDisks(NoHooksLU):
1877
  """Shutdown an instance's disks.
1878

1879
  """
1880
  _OP_REQP = ["instance_name"]
1881

    
1882
  def CheckPrereq(self):
1883
    """Check prerequisites.
1884

1885
    This checks that the instance is in the cluster.
1886

1887
    """
1888
    instance = self.cfg.GetInstanceInfo(
1889
      self.cfg.ExpandInstanceName(self.op.instance_name))
1890
    if instance is None:
1891
      raise errors.OpPrereqError("Instance '%s' not known" %
1892
                                 self.op.instance_name)
1893
    self.instance = instance
1894

    
1895
  def Exec(self, feedback_fn):
1896
    """Deactivate the disks
1897

1898
    """
1899
    instance = self.instance
1900
    ins_l = rpc.call_instance_list([instance.primary_node])
1901
    ins_l = ins_l[instance.primary_node]
1902
    if not type(ins_l) is list:
1903
      raise errors.OpExecError("Can't contact node '%s'" %
1904
                               instance.primary_node)
1905

    
1906
    if self.instance.name in ins_l:
1907
      raise errors.OpExecError("Instance is running, can't shutdown"
1908
                               " block devices.")
1909

    
1910
    _ShutdownInstanceDisks(instance, self.cfg)
1911

    
1912

    
1913
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1914
  """Shutdown block devices of an instance.
1915

1916
  This does the shutdown on all nodes of the instance.
1917

1918
  If the ignore_primary is false, errors on the primary node are
1919
  ignored.
1920

1921
  """
1922
  result = True
1923
  for disk in instance.disks:
1924
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1925
      cfg.SetDiskID(top_disk, node)
1926
      if not rpc.call_blockdev_shutdown(node, top_disk):
1927
        logger.Error("could not shutdown block device %s on node %s" %
1928
                     (disk.iv_name, node))
1929
        if not ignore_primary or node != instance.primary_node:
1930
          result = False
1931
  return result
1932

    
1933

    
1934
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1935
  """Checks if a node has enough free memory.
1936

1937
  This function check if a given node has the needed amount of free
1938
  memory. In case the node has less memory or we cannot get the
1939
  information from the node, this function raise an OpPrereqError
1940
  exception.
1941

1942
  Args:
1943
    - cfg: a ConfigWriter instance
1944
    - node: the node name
1945
    - reason: string to use in the error message
1946
    - requested: the amount of memory in MiB
1947

1948
  """
1949
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1950
  if not nodeinfo or not isinstance(nodeinfo, dict):
1951
    raise errors.OpPrereqError("Could not contact node %s for resource"
1952
                             " information" % (node,))
1953

    
1954
  free_mem = nodeinfo[node].get('memory_free')
1955
  if not isinstance(free_mem, int):
1956
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1957
                             " was '%s'" % (node, free_mem))
1958
  if requested > free_mem:
1959
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1960
                             " needed %s MiB, available %s MiB" %
1961
                             (node, reason, requested, free_mem))
1962

    
1963

    
1964
class LUStartupInstance(LogicalUnit):
1965
  """Starts an instance.
1966

1967
  """
1968
  HPATH = "instance-start"
1969
  HTYPE = constants.HTYPE_INSTANCE
1970
  _OP_REQP = ["instance_name", "force"]
1971

    
1972
  def BuildHooksEnv(self):
1973
    """Build hooks env.
1974

1975
    This runs on master, primary and secondary nodes of the instance.
1976

1977
    """
1978
    env = {
1979
      "FORCE": self.op.force,
1980
      }
1981
    env.update(_BuildInstanceHookEnvByObject(self.instance))
1982
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1983
          list(self.instance.secondary_nodes))
1984
    return env, nl, nl
1985

    
1986
  def CheckPrereq(self):
1987
    """Check prerequisites.
1988

1989
    This checks that the instance is in the cluster.
1990

1991
    """
1992
    instance = self.cfg.GetInstanceInfo(
1993
      self.cfg.ExpandInstanceName(self.op.instance_name))
1994
    if instance is None:
1995
      raise errors.OpPrereqError("Instance '%s' not known" %
1996
                                 self.op.instance_name)
1997

    
1998
    # check bridges existance
1999
    _CheckInstanceBridgesExist(instance)
2000

    
2001
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2002
                         "starting instance %s" % instance.name,
2003
                         instance.memory)
2004

    
2005
    self.instance = instance
2006
    self.op.instance_name = instance.name
2007

    
2008
  def Exec(self, feedback_fn):
2009
    """Start the instance.
2010

2011
    """
2012
    instance = self.instance
2013
    force = self.op.force
2014
    extra_args = getattr(self.op, "extra_args", "")
2015

    
2016
    self.cfg.MarkInstanceUp(instance.name)
2017

    
2018
    node_current = instance.primary_node
2019

    
2020
    _StartInstanceDisks(self.cfg, instance, force)
2021

    
2022
    if not rpc.call_instance_start(node_current, instance, extra_args):
2023
      _ShutdownInstanceDisks(instance, self.cfg)
2024
      raise errors.OpExecError("Could not start instance")
2025

    
2026

    
2027
class LURebootInstance(LogicalUnit):
2028
  """Reboot an instance.
2029

2030
  """
2031
  HPATH = "instance-reboot"
2032
  HTYPE = constants.HTYPE_INSTANCE
2033
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2034

    
2035
  def BuildHooksEnv(self):
2036
    """Build hooks env.
2037

2038
    This runs on master, primary and secondary nodes of the instance.
2039

2040
    """
2041
    env = {
2042
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2043
      }
2044
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2045
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2046
          list(self.instance.secondary_nodes))
2047
    return env, nl, nl
2048

    
2049
  def CheckPrereq(self):
2050
    """Check prerequisites.
2051

2052
    This checks that the instance is in the cluster.
2053

2054
    """
2055
    instance = self.cfg.GetInstanceInfo(
2056
      self.cfg.ExpandInstanceName(self.op.instance_name))
2057
    if instance is None:
2058
      raise errors.OpPrereqError("Instance '%s' not known" %
2059
                                 self.op.instance_name)
2060

    
2061
    # check bridges existance
2062
    _CheckInstanceBridgesExist(instance)
2063

    
2064
    self.instance = instance
2065
    self.op.instance_name = instance.name
2066

    
2067
  def Exec(self, feedback_fn):
2068
    """Reboot the instance.
2069

2070
    """
2071
    instance = self.instance
2072
    ignore_secondaries = self.op.ignore_secondaries
2073
    reboot_type = self.op.reboot_type
2074
    extra_args = getattr(self.op, "extra_args", "")
2075

    
2076
    node_current = instance.primary_node
2077

    
2078
    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2079
                           constants.INSTANCE_REBOOT_HARD,
2080
                           constants.INSTANCE_REBOOT_FULL]:
2081
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2082
                                  (constants.INSTANCE_REBOOT_SOFT,
2083
                                   constants.INSTANCE_REBOOT_HARD,
2084
                                   constants.INSTANCE_REBOOT_FULL))
2085

    
2086
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2087
                       constants.INSTANCE_REBOOT_HARD]:
2088
      if not rpc.call_instance_reboot(node_current, instance,
2089
                                      reboot_type, extra_args):
2090
        raise errors.OpExecError("Could not reboot instance")
2091
    else:
2092
      if not rpc.call_instance_shutdown(node_current, instance):
2093
        raise errors.OpExecError("could not shutdown instance for full reboot")
2094
      _ShutdownInstanceDisks(instance, self.cfg)
2095
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2096
      if not rpc.call_instance_start(node_current, instance, extra_args):
2097
        _ShutdownInstanceDisks(instance, self.cfg)
2098
        raise errors.OpExecError("Could not start instance for full reboot")
2099

    
2100
    self.cfg.MarkInstanceUp(instance.name)
2101

    
2102

    
2103
class LUShutdownInstance(LogicalUnit):
2104
  """Shutdown an instance.
2105

2106
  """
2107
  HPATH = "instance-stop"
2108
  HTYPE = constants.HTYPE_INSTANCE
2109
  _OP_REQP = ["instance_name"]
2110

    
2111
  def BuildHooksEnv(self):
2112
    """Build hooks env.
2113

2114
    This runs on master, primary and secondary nodes of the instance.
2115

2116
    """
2117
    env = _BuildInstanceHookEnvByObject(self.instance)
2118
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2119
          list(self.instance.secondary_nodes))
2120
    return env, nl, nl
2121

    
2122
  def CheckPrereq(self):
2123
    """Check prerequisites.
2124

2125
    This checks that the instance is in the cluster.
2126

2127
    """
2128
    instance = self.cfg.GetInstanceInfo(
2129
      self.cfg.ExpandInstanceName(self.op.instance_name))
2130
    if instance is None:
2131
      raise errors.OpPrereqError("Instance '%s' not known" %
2132
                                 self.op.instance_name)
2133
    self.instance = instance
2134

    
2135
  def Exec(self, feedback_fn):
2136
    """Shutdown the instance.
2137

2138
    """
2139
    instance = self.instance
2140
    node_current = instance.primary_node
2141
    self.cfg.MarkInstanceDown(instance.name)
2142
    if not rpc.call_instance_shutdown(node_current, instance):
2143
      logger.Error("could not shutdown instance")
2144

    
2145
    _ShutdownInstanceDisks(instance, self.cfg)
2146

    
2147

    
2148
class LUReinstallInstance(LogicalUnit):
2149
  """Reinstall an instance.
2150

2151
  """
2152
  HPATH = "instance-reinstall"
2153
  HTYPE = constants.HTYPE_INSTANCE
2154
  _OP_REQP = ["instance_name"]
2155

    
2156
  def BuildHooksEnv(self):
2157
    """Build hooks env.
2158

2159
    This runs on master, primary and secondary nodes of the instance.
2160

2161
    """
2162
    env = _BuildInstanceHookEnvByObject(self.instance)
2163
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2164
          list(self.instance.secondary_nodes))
2165
    return env, nl, nl
2166

    
2167
  def CheckPrereq(self):
2168
    """Check prerequisites.
2169

2170
    This checks that the instance is in the cluster and is not running.
2171

2172
    """
2173
    instance = self.cfg.GetInstanceInfo(
2174
      self.cfg.ExpandInstanceName(self.op.instance_name))
2175
    if instance is None:
2176
      raise errors.OpPrereqError("Instance '%s' not known" %
2177
                                 self.op.instance_name)
2178
    if instance.disk_template == constants.DT_DISKLESS:
2179
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2180
                                 self.op.instance_name)
2181
    if instance.status != "down":
2182
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2183
                                 self.op.instance_name)
2184
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2185
    if remote_info:
2186
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2187
                                 (self.op.instance_name,
2188
                                  instance.primary_node))
2189

    
2190
    self.op.os_type = getattr(self.op, "os_type", None)
2191
    if self.op.os_type is not None:
2192
      # OS verification
2193
      pnode = self.cfg.GetNodeInfo(
2194
        self.cfg.ExpandNodeName(instance.primary_node))
2195
      if pnode is None:
2196
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2197
                                   self.op.pnode)
2198
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2199
      if not os_obj:
2200
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2201
                                   " primary node"  % self.op.os_type)
2202

    
2203
    self.instance = instance
2204

    
2205
  def Exec(self, feedback_fn):
2206
    """Reinstall the instance.
2207

2208
    """
2209
    inst = self.instance
2210

    
2211
    if self.op.os_type is not None:
2212
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2213
      inst.os = self.op.os_type
2214
      self.cfg.AddInstance(inst)
2215

    
2216
    _StartInstanceDisks(self.cfg, inst, None)
2217
    try:
2218
      feedback_fn("Running the instance OS create scripts...")
2219
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2220
        raise errors.OpExecError("Could not install OS for instance %s"
2221
                                 " on node %s" %
2222
                                 (inst.name, inst.primary_node))
2223
    finally:
2224
      _ShutdownInstanceDisks(inst, self.cfg)
2225

    
2226

    
2227
class LURenameInstance(LogicalUnit):
2228
  """Rename an instance.
2229

2230
  """
2231
  HPATH = "instance-rename"
2232
  HTYPE = constants.HTYPE_INSTANCE
2233
  _OP_REQP = ["instance_name", "new_name"]
2234

    
2235
  def BuildHooksEnv(self):
2236
    """Build hooks env.
2237

2238
    This runs on master, primary and secondary nodes of the instance.
2239

2240
    """
2241
    env = _BuildInstanceHookEnvByObject(self.instance)
2242
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2243
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2244
          list(self.instance.secondary_nodes))
2245
    return env, nl, nl
2246

    
2247
  def CheckPrereq(self):
2248
    """Check prerequisites.
2249

2250
    This checks that the instance is in the cluster and is not running.
2251

2252
    """
2253
    instance = self.cfg.GetInstanceInfo(
2254
      self.cfg.ExpandInstanceName(self.op.instance_name))
2255
    if instance is None:
2256
      raise errors.OpPrereqError("Instance '%s' not known" %
2257
                                 self.op.instance_name)
2258
    if instance.status != "down":
2259
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2260
                                 self.op.instance_name)
2261
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2262
    if remote_info:
2263
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2264
                                 (self.op.instance_name,
2265
                                  instance.primary_node))
2266
    self.instance = instance
2267

    
2268
    # new name verification
2269
    name_info = utils.HostInfo(self.op.new_name)
2270

    
2271
    self.op.new_name = new_name = name_info.name
2272
    instance_list = self.cfg.GetInstanceList()
2273
    if new_name in instance_list:
2274
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2275
                                 new_name)
2276

    
2277
    if not getattr(self.op, "ignore_ip", False):
2278
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2279
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2280
                                   (name_info.ip, new_name))
2281

    
2282

    
2283
  def Exec(self, feedback_fn):
2284
    """Reinstall the instance.
2285

2286
    """
2287
    inst = self.instance
2288
    old_name = inst.name
2289

    
2290
    if inst.disk_template == constants.DT_FILE:
2291
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2292

    
2293
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2294
    # Change the instance lock. This is definitely safe while we hold the BGL
2295
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2296
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2297

    
2298
    # re-read the instance from the configuration after rename
2299
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2300

    
2301
    if inst.disk_template == constants.DT_FILE:
2302
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2303
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2304
                                                old_file_storage_dir,
2305
                                                new_file_storage_dir)
2306

    
2307
      if not result:
2308
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2309
                                 " directory '%s' to '%s' (but the instance"
2310
                                 " has been renamed in Ganeti)" % (
2311
                                 inst.primary_node, old_file_storage_dir,
2312
                                 new_file_storage_dir))
2313

    
2314
      if not result[0]:
2315
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2316
                                 " (but the instance has been renamed in"
2317
                                 " Ganeti)" % (old_file_storage_dir,
2318
                                               new_file_storage_dir))
2319

    
2320
    _StartInstanceDisks(self.cfg, inst, None)
2321
    try:
2322
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2323
                                          "sda", "sdb"):
2324
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2325
               " instance has been renamed in Ganeti)" %
2326
               (inst.name, inst.primary_node))
2327
        logger.Error(msg)
2328
    finally:
2329
      _ShutdownInstanceDisks(inst, self.cfg)
2330

    
2331

    
2332
class LURemoveInstance(LogicalUnit):
2333
  """Remove an instance.
2334

2335
  """
2336
  HPATH = "instance-remove"
2337
  HTYPE = constants.HTYPE_INSTANCE
2338
  _OP_REQP = ["instance_name", "ignore_failures"]
2339

    
2340
  def BuildHooksEnv(self):
2341
    """Build hooks env.
2342

2343
    This runs on master, primary and secondary nodes of the instance.
2344

2345
    """
2346
    env = _BuildInstanceHookEnvByObject(self.instance)
2347
    nl = [self.sstore.GetMasterNode()]
2348
    return env, nl, nl
2349

    
2350
  def CheckPrereq(self):
2351
    """Check prerequisites.
2352

2353
    This checks that the instance is in the cluster.
2354

2355
    """
2356
    instance = self.cfg.GetInstanceInfo(
2357
      self.cfg.ExpandInstanceName(self.op.instance_name))
2358
    if instance is None:
2359
      raise errors.OpPrereqError("Instance '%s' not known" %
2360
                                 self.op.instance_name)
2361
    self.instance = instance
2362

    
2363
  def Exec(self, feedback_fn):
2364
    """Remove the instance.
2365

2366
    """
2367
    instance = self.instance
2368
    logger.Info("shutting down instance %s on node %s" %
2369
                (instance.name, instance.primary_node))
2370

    
2371
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2372
      if self.op.ignore_failures:
2373
        feedback_fn("Warning: can't shutdown instance")
2374
      else:
2375
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2376
                                 (instance.name, instance.primary_node))
2377

    
2378
    logger.Info("removing block devices for instance %s" % instance.name)
2379

    
2380
    if not _RemoveDisks(instance, self.cfg):
2381
      if self.op.ignore_failures:
2382
        feedback_fn("Warning: can't remove instance's disks")
2383
      else:
2384
        raise errors.OpExecError("Can't remove instance's disks")
2385

    
2386
    logger.Info("removing instance %s out of cluster config" % instance.name)
2387

    
2388
    self.cfg.RemoveInstance(instance.name)
2389
    # Remove the new instance from the Ganeti Lock Manager
2390
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2391

    
2392

    
2393
class LUQueryInstances(NoHooksLU):
2394
  """Logical unit for querying instances.
2395

2396
  """
2397
  _OP_REQP = ["output_fields", "names"]
2398

    
2399
  def CheckPrereq(self):
2400
    """Check prerequisites.
2401

2402
    This checks that the fields required are valid output fields.
2403

2404
    """
2405
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2406
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2407
                               "admin_state", "admin_ram",
2408
                               "disk_template", "ip", "mac", "bridge",
2409
                               "sda_size", "sdb_size", "vcpus", "tags"],
2410
                       dynamic=self.dynamic_fields,
2411
                       selected=self.op.output_fields)
2412

    
2413
    self.wanted = _GetWantedInstances(self, self.op.names)
2414

    
2415
  def Exec(self, feedback_fn):
2416
    """Computes the list of nodes and their attributes.
2417

2418
    """
2419
    instance_names = self.wanted
2420
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2421
                     in instance_names]
2422

    
2423
    # begin data gathering
2424

    
2425
    nodes = frozenset([inst.primary_node for inst in instance_list])
2426

    
2427
    bad_nodes = []
2428
    if self.dynamic_fields.intersection(self.op.output_fields):
2429
      live_data = {}
2430
      node_data = rpc.call_all_instances_info(nodes)
2431
      for name in nodes:
2432
        result = node_data[name]
2433
        if result:
2434
          live_data.update(result)
2435
        elif result == False:
2436
          bad_nodes.append(name)
2437
        # else no instance is alive
2438
    else:
2439
      live_data = dict([(name, {}) for name in instance_names])
2440

    
2441
    # end data gathering
2442

    
2443
    output = []
2444
    for instance in instance_list:
2445
      iout = []
2446
      for field in self.op.output_fields:
2447
        if field == "name":
2448
          val = instance.name
2449
        elif field == "os":
2450
          val = instance.os
2451
        elif field == "pnode":
2452
          val = instance.primary_node
2453
        elif field == "snodes":
2454
          val = list(instance.secondary_nodes)
2455
        elif field == "admin_state":
2456
          val = (instance.status != "down")
2457
        elif field == "oper_state":
2458
          if instance.primary_node in bad_nodes:
2459
            val = None
2460
          else:
2461
            val = bool(live_data.get(instance.name))
2462
        elif field == "status":
2463
          if instance.primary_node in bad_nodes:
2464
            val = "ERROR_nodedown"
2465
          else:
2466
            running = bool(live_data.get(instance.name))
2467
            if running:
2468
              if instance.status != "down":
2469
                val = "running"
2470
              else:
2471
                val = "ERROR_up"
2472
            else:
2473
              if instance.status != "down":
2474
                val = "ERROR_down"
2475
              else:
2476
                val = "ADMIN_down"
2477
        elif field == "admin_ram":
2478
          val = instance.memory
2479
        elif field == "oper_ram":
2480
          if instance.primary_node in bad_nodes:
2481
            val = None
2482
          elif instance.name in live_data:
2483
            val = live_data[instance.name].get("memory", "?")
2484
          else:
2485
            val = "-"
2486
        elif field == "disk_template":
2487
          val = instance.disk_template
2488
        elif field == "ip":
2489
          val = instance.nics[0].ip
2490
        elif field == "bridge":
2491
          val = instance.nics[0].bridge
2492
        elif field == "mac":
2493
          val = instance.nics[0].mac
2494
        elif field == "sda_size" or field == "sdb_size":
2495
          disk = instance.FindDisk(field[:3])
2496
          if disk is None:
2497
            val = None
2498
          else:
2499
            val = disk.size
2500
        elif field == "vcpus":
2501
          val = instance.vcpus
2502
        elif field == "tags":
2503
          val = list(instance.GetTags())
2504
        else:
2505
          raise errors.ParameterError(field)
2506
        iout.append(val)
2507
      output.append(iout)
2508

    
2509
    return output
2510

    
2511

    
2512
class LUFailoverInstance(LogicalUnit):
2513
  """Failover an instance.
2514

2515
  """
2516
  HPATH = "instance-failover"
2517
  HTYPE = constants.HTYPE_INSTANCE
2518
  _OP_REQP = ["instance_name", "ignore_consistency"]
2519

    
2520
  def BuildHooksEnv(self):
2521
    """Build hooks env.
2522

2523
    This runs on master, primary and secondary nodes of the instance.
2524

2525
    """
2526
    env = {
2527
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2528
      }
2529
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2530
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2531
    return env, nl, nl
2532

    
2533
  def CheckPrereq(self):
2534
    """Check prerequisites.
2535

2536
    This checks that the instance is in the cluster.
2537

2538
    """
2539
    instance = self.cfg.GetInstanceInfo(
2540
      self.cfg.ExpandInstanceName(self.op.instance_name))
2541
    if instance is None:
2542
      raise errors.OpPrereqError("Instance '%s' not known" %
2543
                                 self.op.instance_name)
2544

    
2545
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2546
      raise errors.OpPrereqError("Instance's disk layout is not"
2547
                                 " network mirrored, cannot failover.")
2548

    
2549
    secondary_nodes = instance.secondary_nodes
2550
    if not secondary_nodes:
2551
      raise errors.ProgrammerError("no secondary node but using "
2552
                                   "a mirrored disk template")
2553

    
2554
    target_node = secondary_nodes[0]
2555
    # check memory requirements on the secondary node
2556
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2557
                         instance.name, instance.memory)
2558

    
2559
    # check bridge existance
2560
    brlist = [nic.bridge for nic in instance.nics]
2561
    if not rpc.call_bridges_exist(target_node, brlist):
2562
      raise errors.OpPrereqError("One or more target bridges %s does not"
2563
                                 " exist on destination node '%s'" %
2564
                                 (brlist, target_node))
2565

    
2566
    self.instance = instance
2567

    
2568
  def Exec(self, feedback_fn):
2569
    """Failover an instance.
2570

2571
    The failover is done by shutting it down on its present node and
2572
    starting it on the secondary.
2573

2574
    """
2575
    instance = self.instance
2576

    
2577
    source_node = instance.primary_node
2578
    target_node = instance.secondary_nodes[0]
2579

    
2580
    feedback_fn("* checking disk consistency between source and target")
2581
    for dev in instance.disks:
2582
      # for drbd, these are drbd over lvm
2583
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2584
        if instance.status == "up" and not self.op.ignore_consistency:
2585
          raise errors.OpExecError("Disk %s is degraded on target node,"
2586
                                   " aborting failover." % dev.iv_name)
2587

    
2588
    feedback_fn("* shutting down instance on source node")
2589
    logger.Info("Shutting down instance %s on node %s" %
2590
                (instance.name, source_node))
2591

    
2592
    if not rpc.call_instance_shutdown(source_node, instance):
2593
      if self.op.ignore_consistency:
2594
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2595
                     " anyway. Please make sure node %s is down"  %
2596
                     (instance.name, source_node, source_node))
2597
      else:
2598
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2599
                                 (instance.name, source_node))
2600

    
2601
    feedback_fn("* deactivating the instance's disks on source node")
2602
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2603
      raise errors.OpExecError("Can't shut down the instance's disks.")
2604

    
2605
    instance.primary_node = target_node
2606
    # distribute new instance config to the other nodes
2607
    self.cfg.Update(instance)
2608

    
2609
    # Only start the instance if it's marked as up
2610
    if instance.status == "up":
2611
      feedback_fn("* activating the instance's disks on target node")
2612
      logger.Info("Starting instance %s on node %s" %
2613
                  (instance.name, target_node))
2614

    
2615
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2616
                                               ignore_secondaries=True)
2617
      if not disks_ok:
2618
        _ShutdownInstanceDisks(instance, self.cfg)
2619
        raise errors.OpExecError("Can't activate the instance's disks")
2620

    
2621
      feedback_fn("* starting the instance on the target node")
2622
      if not rpc.call_instance_start(target_node, instance, None):
2623
        _ShutdownInstanceDisks(instance, self.cfg)
2624
        raise errors.OpExecError("Could not start instance %s on node %s." %
2625
                                 (instance.name, target_node))
2626

    
2627

    
2628
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2629
  """Create a tree of block devices on the primary node.
2630

2631
  This always creates all devices.
2632

2633
  """
2634
  if device.children:
2635
    for child in device.children:
2636
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2637
        return False
2638

    
2639
  cfg.SetDiskID(device, node)
2640
  new_id = rpc.call_blockdev_create(node, device, device.size,
2641
                                    instance.name, True, info)
2642
  if not new_id:
2643
    return False
2644
  if device.physical_id is None:
2645
    device.physical_id = new_id
2646
  return True
2647

    
2648

    
2649
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2650
  """Create a tree of block devices on a secondary node.
2651

2652
  If this device type has to be created on secondaries, create it and
2653
  all its children.
2654

2655
  If not, just recurse to children keeping the same 'force' value.
2656

2657
  """
2658
  if device.CreateOnSecondary():
2659
    force = True
2660
  if device.children:
2661
    for child in device.children:
2662
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2663
                                        child, force, info):
2664
        return False
2665

    
2666
  if not force:
2667
    return True
2668
  cfg.SetDiskID(device, node)
2669
  new_id = rpc.call_blockdev_create(node, device, device.size,
2670
                                    instance.name, False, info)
2671
  if not new_id:
2672
    return False
2673
  if device.physical_id is None:
2674
    device.physical_id = new_id
2675
  return True
2676

    
2677

    
2678
def _GenerateUniqueNames(cfg, exts):
2679
  """Generate a suitable LV name.
2680

2681
  This will generate a logical volume name for the given instance.
2682

2683
  """
2684
  results = []
2685
  for val in exts:
2686
    new_id = cfg.GenerateUniqueID()
2687
    results.append("%s%s" % (new_id, val))
2688
  return results
2689

    
2690

    
2691
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2692
  """Generate a drbd8 device complete with its children.
2693

2694
  """
2695
  port = cfg.AllocatePort()
2696
  vgname = cfg.GetVGName()
2697
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2698
                          logical_id=(vgname, names[0]))
2699
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2700
                          logical_id=(vgname, names[1]))
2701
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2702
                          logical_id = (primary, secondary, port),
2703
                          children = [dev_data, dev_meta],
2704
                          iv_name=iv_name)
2705
  return drbd_dev
2706

    
2707

    
2708
def _GenerateDiskTemplate(cfg, template_name,
2709
                          instance_name, primary_node,
2710
                          secondary_nodes, disk_sz, swap_sz,
2711
                          file_storage_dir, file_driver):
2712
  """Generate the entire disk layout for a given template type.
2713

2714
  """
2715
  #TODO: compute space requirements
2716

    
2717
  vgname = cfg.GetVGName()
2718
  if template_name == constants.DT_DISKLESS:
2719
    disks = []
2720
  elif template_name == constants.DT_PLAIN:
2721
    if len(secondary_nodes) != 0:
2722
      raise errors.ProgrammerError("Wrong template configuration")
2723

    
2724
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2725
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2726
                           logical_id=(vgname, names[0]),
2727
                           iv_name = "sda")
2728
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2729
                           logical_id=(vgname, names[1]),
2730
                           iv_name = "sdb")
2731
    disks = [sda_dev, sdb_dev]
2732
  elif template_name == constants.DT_DRBD8:
2733
    if len(secondary_nodes) != 1:
2734
      raise errors.ProgrammerError("Wrong template configuration")
2735
    remote_node = secondary_nodes[0]
2736
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2737
                                       ".sdb_data", ".sdb_meta"])
2738
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2739
                                         disk_sz, names[0:2], "sda")
2740
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2741
                                         swap_sz, names[2:4], "sdb")
2742
    disks = [drbd_sda_dev, drbd_sdb_dev]
2743
  elif template_name == constants.DT_FILE:
2744
    if len(secondary_nodes) != 0:
2745
      raise errors.ProgrammerError("Wrong template configuration")
2746

    
2747
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2748
                                iv_name="sda", logical_id=(file_driver,
2749
                                "%s/sda" % file_storage_dir))
2750
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2751
                                iv_name="sdb", logical_id=(file_driver,
2752
                                "%s/sdb" % file_storage_dir))
2753
    disks = [file_sda_dev, file_sdb_dev]
2754
  else:
2755
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2756
  return disks
2757

    
2758

    
2759
def _GetInstanceInfoText(instance):
2760
  """Compute that text that should be added to the disk's metadata.
2761

2762
  """
2763
  return "originstname+%s" % instance.name
2764

    
2765

    
2766
def _CreateDisks(cfg, instance):
2767
  """Create all disks for an instance.
2768

2769
  This abstracts away some work from AddInstance.
2770

2771
  Args:
2772
    instance: the instance object
2773

2774
  Returns:
2775
    True or False showing the success of the creation process
2776

2777
  """
2778
  info = _GetInstanceInfoText(instance)
2779

    
2780
  if instance.disk_template == constants.DT_FILE:
2781
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2782
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2783
                                              file_storage_dir)
2784

    
2785
    if not result:
2786
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2787
      return False
2788

    
2789
    if not result[0]:
2790
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2791
      return False
2792

    
2793
  for device in instance.disks:
2794
    logger.Info("creating volume %s for instance %s" %
2795
                (device.iv_name, instance.name))
2796
    #HARDCODE
2797
    for secondary_node in instance.secondary_nodes:
2798
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2799
                                        device, False, info):
2800
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2801
                     (device.iv_name, device, secondary_node))
2802
        return False
2803
    #HARDCODE
2804
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2805
                                    instance, device, info):
2806
      logger.Error("failed to create volume %s on primary!" %
2807
                   device.iv_name)
2808
      return False
2809

    
2810
  return True
2811

    
2812

    
2813
def _RemoveDisks(instance, cfg):
2814
  """Remove all disks for an instance.
2815

2816
  This abstracts away some work from `AddInstance()` and
2817
  `RemoveInstance()`. Note that in case some of the devices couldn't
2818
  be removed, the removal will continue with the other ones (compare
2819
  with `_CreateDisks()`).
2820

2821
  Args:
2822
    instance: the instance object
2823

2824
  Returns:
2825
    True or False showing the success of the removal proces
2826

2827
  """
2828
  logger.Info("removing block devices for instance %s" % instance.name)
2829

    
2830
  result = True
2831
  for device in instance.disks:
2832
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2833
      cfg.SetDiskID(disk, node)
2834
      if not rpc.call_blockdev_remove(node, disk):
2835
        logger.Error("could not remove block device %s on node %s,"
2836
                     " continuing anyway" %
2837
                     (device.iv_name, node))
2838
        result = False
2839

    
2840
  if instance.disk_template == constants.DT_FILE:
2841
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2842
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2843
                                            file_storage_dir):
2844
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2845
      result = False
2846

    
2847
  return result
2848

    
2849

    
2850
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2851
  """Compute disk size requirements in the volume group
2852

2853
  This is currently hard-coded for the two-drive layout.
2854

2855
  """
2856
  # Required free disk space as a function of disk and swap space
2857
  req_size_dict = {
2858
    constants.DT_DISKLESS: None,
2859
    constants.DT_PLAIN: disk_size + swap_size,
2860
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2861
    constants.DT_DRBD8: disk_size + swap_size + 256,
2862
    constants.DT_FILE: None,
2863
  }
2864

    
2865
  if disk_template not in req_size_dict:
2866
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2867
                                 " is unknown" %  disk_template)
2868

    
2869
  return req_size_dict[disk_template]
2870

    
2871

    
2872
class LUCreateInstance(LogicalUnit):
2873
  """Create an instance.
2874

2875
  """
2876
  HPATH = "instance-add"
2877
  HTYPE = constants.HTYPE_INSTANCE
2878
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2879
              "disk_template", "swap_size", "mode", "start", "vcpus",
2880
              "wait_for_sync", "ip_check", "mac"]
2881

    
2882
  def _RunAllocator(self):
2883
    """Run the allocator based on input opcode.
2884

2885
    """
2886
    disks = [{"size": self.op.disk_size, "mode": "w"},
2887
             {"size": self.op.swap_size, "mode": "w"}]
2888
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2889
             "bridge": self.op.bridge}]
2890
    ial = IAllocator(self.cfg, self.sstore,
2891
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2892
                     name=self.op.instance_name,
2893
                     disk_template=self.op.disk_template,
2894
                     tags=[],
2895
                     os=self.op.os_type,
2896
                     vcpus=self.op.vcpus,
2897
                     mem_size=self.op.mem_size,
2898
                     disks=disks,
2899
                     nics=nics,
2900
                     )
2901

    
2902
    ial.Run(self.op.iallocator)
2903

    
2904
    if not ial.success:
2905
      raise errors.OpPrereqError("Can't compute nodes using"
2906
                                 " iallocator '%s': %s" % (self.op.iallocator,
2907
                                                           ial.info))
2908
    if len(ial.nodes) != ial.required_nodes:
2909
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2910
                                 " of nodes (%s), required %s" %
2911
                                 (len(ial.nodes), ial.required_nodes))
2912
    self.op.pnode = ial.nodes[0]
2913
    logger.ToStdout("Selected nodes for the instance: %s" %
2914
                    (", ".join(ial.nodes),))
2915
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2916
                (self.op.instance_name, self.op.iallocator, ial.nodes))
2917
    if ial.required_nodes == 2:
2918
      self.op.snode = ial.nodes[1]
2919

    
2920
  def BuildHooksEnv(self):
2921
    """Build hooks env.
2922

2923
    This runs on master, primary and secondary nodes of the instance.
2924

2925
    """
2926
    env = {
2927
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2928
      "INSTANCE_DISK_SIZE": self.op.disk_size,
2929
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
2930
      "INSTANCE_ADD_MODE": self.op.mode,
2931
      }
2932
    if self.op.mode == constants.INSTANCE_IMPORT:
2933
      env["INSTANCE_SRC_NODE"] = self.op.src_node
2934
      env["INSTANCE_SRC_PATH"] = self.op.src_path
2935
      env["INSTANCE_SRC_IMAGE"] = self.src_image
2936

    
2937
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2938
      primary_node=self.op.pnode,
2939
      secondary_nodes=self.secondaries,
2940
      status=self.instance_status,
2941
      os_type=self.op.os_type,
2942
      memory=self.op.mem_size,
2943
      vcpus=self.op.vcpus,
2944
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2945
    ))
2946

    
2947
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2948
          self.secondaries)
2949
    return env, nl, nl
2950

    
2951

    
2952
  def CheckPrereq(self):
2953
    """Check prerequisites.
2954

2955
    """
2956
    # set optional parameters to none if they don't exist
2957
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2958
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2959
                 "vnc_bind_address"]:
2960
      if not hasattr(self.op, attr):
2961
        setattr(self.op, attr, None)
2962

    
2963
    if self.op.mode not in (constants.INSTANCE_CREATE,
2964
                            constants.INSTANCE_IMPORT):
2965
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2966
                                 self.op.mode)
2967

    
2968
    if (not self.cfg.GetVGName() and
2969
        self.op.disk_template not in constants.DTS_NOT_LVM):
2970
      raise errors.OpPrereqError("Cluster does not support lvm-based"
2971
                                 " instances")
2972

    
2973
    if self.op.mode == constants.INSTANCE_IMPORT:
2974
      src_node = getattr(self.op, "src_node", None)
2975
      src_path = getattr(self.op, "src_path", None)
2976
      if src_node is None or src_path is None:
2977
        raise errors.OpPrereqError("Importing an instance requires source"
2978
                                   " node and path options")
2979
      src_node_full = self.cfg.ExpandNodeName(src_node)
2980
      if src_node_full is None:
2981
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2982
      self.op.src_node = src_node = src_node_full
2983

    
2984
      if not os.path.isabs(src_path):
2985
        raise errors.OpPrereqError("The source path must be absolute")
2986

    
2987
      export_info = rpc.call_export_info(src_node, src_path)
2988

    
2989
      if not export_info:
2990
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
2991

    
2992
      if not export_info.has_section(constants.INISECT_EXP):
2993
        raise errors.ProgrammerError("Corrupted export config")
2994

    
2995
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
2996
      if (int(ei_version) != constants.EXPORT_VERSION):
2997
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2998
                                   (ei_version, constants.EXPORT_VERSION))
2999

    
3000
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3001
        raise errors.OpPrereqError("Can't import instance with more than"
3002
                                   " one data disk")
3003

    
3004
      # FIXME: are the old os-es, disk sizes, etc. useful?
3005
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3006
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3007
                                                         'disk0_dump'))
3008
      self.src_image = diskimage
3009
    else: # INSTANCE_CREATE
3010
      if getattr(self.op, "os_type", None) is None:
3011
        raise errors.OpPrereqError("No guest OS specified")
3012

    
3013
    #### instance parameters check
3014

    
3015
    # disk template and mirror node verification
3016
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3017
      raise errors.OpPrereqError("Invalid disk template name")
3018

    
3019
    # instance name verification
3020
    hostname1 = utils.HostInfo(self.op.instance_name)
3021

    
3022
    self.op.instance_name = instance_name = hostname1.name
3023
    instance_list = self.cfg.GetInstanceList()
3024
    if instance_name in instance_list:
3025
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3026
                                 instance_name)
3027

    
3028
    # ip validity checks
3029
    ip = getattr(self.op, "ip", None)
3030
    if ip is None or ip.lower() == "none":
3031
      inst_ip = None
3032
    elif ip.lower() == "auto":
3033
      inst_ip = hostname1.ip
3034
    else:
3035
      if not utils.IsValidIP(ip):
3036
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3037
                                   " like a valid IP" % ip)
3038
      inst_ip = ip
3039
    self.inst_ip = self.op.ip = inst_ip
3040

    
3041
    if self.op.start and not self.op.ip_check:
3042
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3043
                                 " adding an instance in start mode")
3044

    
3045
    if self.op.ip_check:
3046
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3047
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3048
                                   (hostname1.ip, instance_name))
3049

    
3050
    # MAC address verification
3051
    if self.op.mac != "auto":
3052
      if not utils.IsValidMac(self.op.mac.lower()):
3053
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3054
                                   self.op.mac)
3055

    
3056
    # bridge verification
3057
    bridge = getattr(self.op, "bridge", None)
3058
    if bridge is None:
3059
      self.op.bridge = self.cfg.GetDefBridge()
3060
    else:
3061
      self.op.bridge = bridge
3062

    
3063
    # boot order verification
3064
    if self.op.hvm_boot_order is not None:
3065
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3066
        raise errors.OpPrereqError("invalid boot order specified,"
3067
                                   " must be one or more of [acdn]")
3068
    # file storage checks
3069
    if (self.op.file_driver and
3070
        not self.op.file_driver in constants.FILE_DRIVER):
3071
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3072
                                 self.op.file_driver)
3073

    
3074
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3075
      raise errors.OpPrereqError("File storage directory not a relative"
3076
                                 " path")
3077
    #### allocator run
3078

    
3079
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3080
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3081
                                 " node must be given")
3082

    
3083
    if self.op.iallocator is not None:
3084
      self._RunAllocator()
3085

    
3086
    #### node related checks
3087

    
3088
    # check primary node
3089
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3090
    if pnode is None:
3091
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3092
                                 self.op.pnode)
3093
    self.op.pnode = pnode.name
3094
    self.pnode = pnode
3095
    self.secondaries = []
3096

    
3097
    # mirror node verification
3098
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3099
      if getattr(self.op, "snode", None) is None:
3100
        raise errors.OpPrereqError("The networked disk templates need"
3101
                                   " a mirror node")
3102

    
3103
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3104
      if snode_name is None:
3105
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3106
                                   self.op.snode)
3107
      elif snode_name == pnode.name:
3108
        raise errors.OpPrereqError("The secondary node cannot be"
3109
                                   " the primary node.")
3110
      self.secondaries.append(snode_name)
3111

    
3112
    req_size = _ComputeDiskSize(self.op.disk_template,
3113
                                self.op.disk_size, self.op.swap_size)
3114

    
3115
    # Check lv size requirements
3116
    if req_size is not None:
3117
      nodenames = [pnode.name] + self.secondaries
3118
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3119
      for node in nodenames:
3120
        info = nodeinfo.get(node, None)
3121
        if not info:
3122
          raise errors.OpPrereqError("Cannot get current information"
3123
                                     " from node '%s'" % node)
3124
        vg_free = info.get('vg_free', None)
3125
        if not isinstance(vg_free, int):
3126
          raise errors.OpPrereqError("Can't compute free disk space on"
3127
                                     " node %s" % node)
3128
        if req_size > info['vg_free']:
3129
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3130
                                     " %d MB available, %d MB required" %
3131
                                     (node, info['vg_free'], req_size))
3132

    
3133
    # os verification
3134
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3135
    if not os_obj:
3136
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3137
                                 " primary node"  % self.op.os_type)
3138

    
3139
    if self.op.kernel_path == constants.VALUE_NONE:
3140
      raise errors.OpPrereqError("Can't set instance kernel to none")
3141

    
3142

    
3143
    # bridge check on primary node
3144
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3145
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3146
                                 " destination node '%s'" %
3147
                                 (self.op.bridge, pnode.name))
3148

    
3149
    # memory check on primary node
3150
    if self.op.start:
3151
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3152
                           "creating instance %s" % self.op.instance_name,
3153
                           self.op.mem_size)
3154

    
3155
    # hvm_cdrom_image_path verification
3156
    if self.op.hvm_cdrom_image_path is not None:
3157
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3158
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3159
                                   " be an absolute path or None, not %s" %
3160
                                   self.op.hvm_cdrom_image_path)
3161
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3162
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3163
                                   " regular file or a symlink pointing to"
3164
                                   " an existing regular file, not %s" %
3165
                                   self.op.hvm_cdrom_image_path)
3166

    
3167
    # vnc_bind_address verification
3168
    if self.op.vnc_bind_address is not None:
3169
      if not utils.IsValidIP(self.op.vnc_bind_address):
3170
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3171
                                   " like a valid IP address" %
3172
                                   self.op.vnc_bind_address)
3173

    
3174
    if self.op.start:
3175
      self.instance_status = 'up'
3176
    else:
3177
      self.instance_status = 'down'
3178

    
3179
  def Exec(self, feedback_fn):
3180
    """Create and add the instance to the cluster.
3181

3182
    """
3183
    instance = self.op.instance_name
3184
    pnode_name = self.pnode.name
3185

    
3186
    if self.op.mac == "auto":
3187
      mac_address = self.cfg.GenerateMAC()
3188
    else:
3189
      mac_address = self.op.mac
3190

    
3191
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3192
    if self.inst_ip is not None:
3193
      nic.ip = self.inst_ip
3194

    
3195
    ht_kind = self.sstore.GetHypervisorType()
3196
    if ht_kind in constants.HTS_REQ_PORT:
3197
      network_port = self.cfg.AllocatePort()
3198
    else:
3199
      network_port = None
3200

    
3201
    if self.op.vnc_bind_address is None:
3202
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3203

    
3204
    # this is needed because os.path.join does not accept None arguments
3205
    if self.op.file_storage_dir is None:
3206
      string_file_storage_dir = ""
3207
    else:
3208
      string_file_storage_dir = self.op.file_storage_dir
3209

    
3210
    # build the full file storage dir path
3211
    file_storage_dir = os.path.normpath(os.path.join(
3212
                                        self.sstore.GetFileStorageDir(),
3213
                                        string_file_storage_dir, instance))
3214

    
3215

    
3216
    disks = _GenerateDiskTemplate(self.cfg,
3217
                                  self.op.disk_template,
3218
                                  instance, pnode_name,
3219
                                  self.secondaries, self.op.disk_size,
3220
                                  self.op.swap_size,
3221
                                  file_storage_dir,
3222
                                  self.op.file_driver)
3223

    
3224
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3225
                            primary_node=pnode_name,
3226
                            memory=self.op.mem_size,
3227
                            vcpus=self.op.vcpus,
3228
                            nics=[nic], disks=disks,
3229
                            disk_template=self.op.disk_template,
3230
                            status=self.instance_status,
3231
                            network_port=network_port,
3232
                            kernel_path=self.op.kernel_path,
3233
                            initrd_path=self.op.initrd_path,
3234
                            hvm_boot_order=self.op.hvm_boot_order,
3235
                            hvm_acpi=self.op.hvm_acpi,
3236
                            hvm_pae=self.op.hvm_pae,
3237
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3238
                            vnc_bind_address=self.op.vnc_bind_address,
3239
                            )
3240

    
3241
    feedback_fn("* creating instance disks...")
3242
    if not _CreateDisks(self.cfg, iobj):
3243
      _RemoveDisks(iobj, self.cfg)
3244
      raise errors.OpExecError("Device creation failed, reverting...")
3245

    
3246
    feedback_fn("adding instance %s to cluster config" % instance)
3247

    
3248
    self.cfg.AddInstance(iobj)
3249
    # Add the new instance to the Ganeti Lock Manager
3250
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3251

    
3252
    if self.op.wait_for_sync:
3253
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3254
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3255
      # make sure the disks are not degraded (still sync-ing is ok)
3256
      time.sleep(15)
3257
      feedback_fn("* checking mirrors status")
3258
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3259
    else:
3260
      disk_abort = False
3261

    
3262
    if disk_abort:
3263
      _RemoveDisks(iobj, self.cfg)
3264
      self.cfg.RemoveInstance(iobj.name)
3265
      # Remove the new instance from the Ganeti Lock Manager
3266
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3267
      raise errors.OpExecError("There are some degraded disks for"
3268
                               " this instance")
3269

    
3270
    feedback_fn("creating os for instance %s on node %s" %
3271
                (instance, pnode_name))
3272

    
3273
    if iobj.disk_template != constants.DT_DISKLESS:
3274
      if self.op.mode == constants.INSTANCE_CREATE:
3275
        feedback_fn("* running the instance OS create scripts...")
3276
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3277
          raise errors.OpExecError("could not add os for instance %s"
3278
                                   " on node %s" %
3279
                                   (instance, pnode_name))
3280

    
3281
      elif self.op.mode == constants.INSTANCE_IMPORT:
3282
        feedback_fn("* running the instance OS import scripts...")
3283
        src_node = self.op.src_node
3284
        src_image = self.src_image
3285
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3286
                                                src_node, src_image):
3287
          raise errors.OpExecError("Could not import os for instance"
3288
                                   " %s on node %s" %
3289
                                   (instance, pnode_name))
3290
      else:
3291
        # also checked in the prereq part
3292
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3293
                                     % self.op.mode)
3294

    
3295
    if self.op.start:
3296
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3297
      feedback_fn("* starting instance...")
3298
      if not rpc.call_instance_start(pnode_name, iobj, None):
3299
        raise errors.OpExecError("Could not start instance")
3300

    
3301

    
3302
class LUConnectConsole(NoHooksLU):
3303
  """Connect to an instance's console.
3304

3305
  This is somewhat special in that it returns the command line that
3306
  you need to run on the master node in order to connect to the
3307
  console.
3308

3309
  """
3310
  _OP_REQP = ["instance_name"]
3311
  REQ_BGL = False
3312

    
3313
  def ExpandNames(self):
3314
    self._ExpandAndLockInstance()
3315

    
3316
  def CheckPrereq(self):
3317
    """Check prerequisites.
3318

3319
    This checks that the instance is in the cluster.
3320

3321
    """
3322
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3323
    assert self.instance is not None, \
3324
      "Cannot retrieve locked instance %s" % self.op.instance_name
3325

    
3326
  def Exec(self, feedback_fn):
3327
    """Connect to the console of an instance
3328

3329
    """
3330
    instance = self.instance
3331
    node = instance.primary_node
3332

    
3333
    node_insts = rpc.call_instance_list([node])[node]
3334
    if node_insts is False:
3335
      raise errors.OpExecError("Can't connect to node %s." % node)
3336

    
3337
    if instance.name not in node_insts:
3338
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3339

    
3340
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3341

    
3342
    hyper = hypervisor.GetHypervisor()
3343
    console_cmd = hyper.GetShellCommandForConsole(instance)
3344

    
3345
    # build ssh cmdline
3346
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3347

    
3348

    
3349
class LUReplaceDisks(LogicalUnit):
3350
  """Replace the disks of an instance.
3351

3352
  """
3353
  HPATH = "mirrors-replace"
3354
  HTYPE = constants.HTYPE_INSTANCE
3355
  _OP_REQP = ["instance_name", "mode", "disks"]
3356

    
3357
  def _RunAllocator(self):
3358
    """Compute a new secondary node using an IAllocator.
3359

3360
    """
3361
    ial = IAllocator(self.cfg, self.sstore,
3362
                     mode=constants.IALLOCATOR_MODE_RELOC,
3363
                     name=self.op.instance_name,
3364
                     relocate_from=[self.sec_node])
3365

    
3366
    ial.Run(self.op.iallocator)
3367

    
3368
    if not ial.success:
3369
      raise errors.OpPrereqError("Can't compute nodes using"
3370
                                 " iallocator '%s': %s" % (self.op.iallocator,
3371
                                                           ial.info))
3372
    if len(ial.nodes) != ial.required_nodes:
3373
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3374
                                 " of nodes (%s), required %s" %
3375
                                 (len(ial.nodes), ial.required_nodes))
3376
    self.op.remote_node = ial.nodes[0]
3377
    logger.ToStdout("Selected new secondary for the instance: %s" %
3378
                    self.op.remote_node)
3379

    
3380
  def BuildHooksEnv(self):
3381
    """Build hooks env.
3382

3383
    This runs on the master, the primary and all the secondaries.
3384

3385
    """
3386
    env = {
3387
      "MODE": self.op.mode,
3388
      "NEW_SECONDARY": self.op.remote_node,
3389
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3390
      }
3391
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3392
    nl = [
3393
      self.sstore.GetMasterNode(),
3394
      self.instance.primary_node,
3395
      ]
3396
    if self.op.remote_node is not None:
3397
      nl.append(self.op.remote_node)
3398
    return env, nl, nl
3399

    
3400
  def CheckPrereq(self):
3401
    """Check prerequisites.
3402

3403
    This checks that the instance is in the cluster.
3404

3405
    """
3406
    if not hasattr(self.op, "remote_node"):
3407
      self.op.remote_node = None
3408

    
3409
    instance = self.cfg.GetInstanceInfo(
3410
      self.cfg.ExpandInstanceName(self.op.instance_name))
3411
    if instance is None:
3412
      raise errors.OpPrereqError("Instance '%s' not known" %
3413
                                 self.op.instance_name)
3414
    self.instance = instance
3415
    self.op.instance_name = instance.name
3416

    
3417
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3418
      raise errors.OpPrereqError("Instance's disk layout is not"
3419
                                 " network mirrored.")
3420

    
3421
    if len(instance.secondary_nodes) != 1:
3422
      raise errors.OpPrereqError("The instance has a strange layout,"
3423
                                 " expected one secondary but found %d" %
3424
                                 len(instance.secondary_nodes))
3425

    
3426
    self.sec_node = instance.secondary_nodes[0]
3427

    
3428
    ia_name = getattr(self.op, "iallocator", None)
3429
    if ia_name is not None:
3430
      if self.op.remote_node is not None:
3431
        raise errors.OpPrereqError("Give either the iallocator or the new"
3432
                                   " secondary, not both")
3433
      self.op.remote_node = self._RunAllocator()
3434

    
3435
    remote_node = self.op.remote_node
3436
    if remote_node is not None:
3437
      remote_node = self.cfg.ExpandNodeName(remote_node)
3438
      if remote_node is None:
3439
        raise errors.OpPrereqError("Node '%s' not known" %
3440
                                   self.op.remote_node)
3441
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3442
    else:
3443
      self.remote_node_info = None
3444
    if remote_node == instance.primary_node:
3445
      raise errors.OpPrereqError("The specified node is the primary node of"
3446
                                 " the instance.")
3447
    elif remote_node == self.sec_node:
3448
      if self.op.mode == constants.REPLACE_DISK_SEC:
3449
        # this is for DRBD8, where we can't execute the same mode of
3450
        # replacement as for drbd7 (no different port allocated)
3451
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3452
                                   " replacement")
3453
    if instance.disk_template == constants.DT_DRBD8:
3454
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3455
          remote_node is not None):
3456
        # switch to replace secondary mode
3457
        self.op.mode = constants.REPLACE_DISK_SEC
3458

    
3459
      if self.op.mode == constants.REPLACE_DISK_ALL:
3460
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3461
                                   " secondary disk replacement, not"
3462
                                   " both at once")
3463
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3464
        if remote_node is not None:
3465
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3466
                                     " the secondary while doing a primary"
3467
                                     " node disk replacement")
3468
        self.tgt_node = instance.primary_node
3469
        self.oth_node = instance.secondary_nodes[0]
3470
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3471
        self.new_node = remote_node # this can be None, in which case
3472
                                    # we don't change the secondary
3473
        self.tgt_node = instance.secondary_nodes[0]
3474
        self.oth_node = instance.primary_node
3475
      else:
3476
        raise errors.ProgrammerError("Unhandled disk replace mode")
3477

    
3478
    for name in self.op.disks:
3479
      if instance.FindDisk(name) is None:
3480
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3481
                                   (name, instance.name))
3482
    self.op.remote_node = remote_node
3483

    
3484
  def _ExecD8DiskOnly(self, feedback_fn):
3485
    """Replace a disk on the primary or secondary for dbrd8.
3486

3487
    The algorithm for replace is quite complicated:
3488
      - for each disk to be replaced:
3489
        - create new LVs on the target node with unique names
3490
        - detach old LVs from the drbd device
3491
        - rename old LVs to name_replaced.<time_t>
3492
        - rename new LVs to old LVs
3493
        - attach the new LVs (with the old names now) to the drbd device
3494
      - wait for sync across all devices
3495
      - for each modified disk:
3496
        - remove old LVs (which have the name name_replaces.<time_t>)
3497

3498
    Failures are not very well handled.
3499

3500
    """
3501
    steps_total = 6
3502
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3503
    instance = self.instance
3504
    iv_names = {}
3505
    vgname = self.cfg.GetVGName()
3506
    # start of work
3507
    cfg = self.cfg
3508
    tgt_node = self.tgt_node
3509
    oth_node = self.oth_node
3510

    
3511
    # Step: check device activation
3512
    self.proc.LogStep(1, steps_total, "check device existence")
3513
    info("checking volume groups")
3514
    my_vg = cfg.GetVGName()
3515
    results = rpc.call_vg_list([oth_node, tgt_node])
3516
    if not results:
3517
      raise errors.OpExecError("Can't list volume groups on the nodes")
3518
    for node in oth_node, tgt_node:
3519
      res = results.get(node, False)
3520
      if not res or my_vg not in res:
3521
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3522
                                 (my_vg, node))
3523
    for dev in instance.disks:
3524
      if not dev.iv_name in self.op.disks:
3525
        continue
3526
      for node in tgt_node, oth_node:
3527
        info("checking %s on %s" % (dev.iv_name, node))
3528
        cfg.SetDiskID(dev, node)
3529
        if not rpc.call_blockdev_find(node, dev):
3530
          raise errors.OpExecError("Can't find device %s on node %s" %
3531
                                   (dev.iv_name, node))
3532

    
3533
    # Step: check other node consistency
3534
    self.proc.LogStep(2, steps_total, "check peer consistency")
3535
    for dev in instance.disks:
3536
      if not dev.iv_name in self.op.disks:
3537
        continue
3538
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3539
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3540
                                   oth_node==instance.primary_node):
3541
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3542
                                 " to replace disks on this node (%s)" %
3543
                                 (oth_node, tgt_node))
3544

    
3545
    # Step: create new storage
3546
    self.proc.LogStep(3, steps_total, "allocate new storage")
3547
    for dev in instance.disks:
3548
      if not dev.iv_name in self.op.disks:
3549
        continue
3550
      size = dev.size
3551
      cfg.SetDiskID(dev, tgt_node)
3552
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3553
      names = _GenerateUniqueNames(cfg, lv_names)
3554
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3555
                             logical_id=(vgname, names[0]))
3556
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3557
                             logical_id=(vgname, names[1]))
3558
      new_lvs = [lv_data, lv_meta]
3559
      old_lvs = dev.children
3560
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3561
      info("creating new local storage on %s for %s" %
3562
           (tgt_node, dev.iv_name))
3563
      # since we *always* want to create this LV, we use the
3564
      # _Create...OnPrimary (which forces the creation), even if we
3565
      # are talking about the secondary node
3566
      for new_lv in new_lvs:
3567
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3568
                                        _GetInstanceInfoText(instance)):
3569
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3570
                                   " node '%s'" %
3571
                                   (new_lv.logical_id[1], tgt_node))
3572

    
3573
    # Step: for each lv, detach+rename*2+attach
3574
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3575
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3576
      info("detaching %s drbd from local storage" % dev.iv_name)
3577
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3578
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3579
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3580
      #dev.children = []
3581
      #cfg.Update(instance)
3582

    
3583
      # ok, we created the new LVs, so now we know we have the needed
3584
      # storage; as such, we proceed on the target node to rename
3585
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3586
      # using the assumption that logical_id == physical_id (which in
3587
      # turn is the unique_id on that node)
3588

    
3589
      # FIXME(iustin): use a better name for the replaced LVs
3590
      temp_suffix = int(time.time())
3591
      ren_fn = lambda d, suff: (d.physical_id[0],
3592
                                d.physical_id[1] + "_replaced-%s" % suff)
3593
      # build the rename list based on what LVs exist on the node
3594
      rlist = []
3595
      for to_ren in old_lvs:
3596
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3597
        if find_res is not None: # device exists
3598
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3599

    
3600
      info("renaming the old LVs on the target node")
3601
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3602
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3603
      # now we rename the new LVs to the old LVs
3604
      info("renaming the new LVs on the target node")
3605
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3606
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3607
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3608

    
3609
      for old, new in zip(old_lvs, new_lvs):
3610
        new.logical_id = old.logical_id
3611
        cfg.SetDiskID(new, tgt_node)
3612

    
3613
      for disk in old_lvs:
3614
        disk.logical_id = ren_fn(disk, temp_suffix)
3615
        cfg.SetDiskID(disk, tgt_node)
3616

    
3617
      # now that the new lvs have the old name, we can add them to the device
3618
      info("adding new mirror component on %s" % tgt_node)
3619
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3620
        for new_lv in new_lvs:
3621
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3622
            warning("Can't rollback device %s", hint="manually cleanup unused"
3623
                    " logical volumes")
3624
        raise errors.OpExecError("Can't add local storage to drbd")
3625

    
3626
      dev.children = new_lvs
3627
      cfg.Update(instance)
3628

    
3629
    # Step: wait for sync
3630

    
3631
    # this can fail as the old devices are degraded and _WaitForSync
3632
    # does a combined result over all disks, so we don't check its
3633
    # return value
3634
    self.proc.LogStep(5, steps_total, "sync devices")
3635
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3636

    
3637
    # so check manually all the devices
3638
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3639
      cfg.SetDiskID(dev, instance.primary_node)
3640
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3641
      if is_degr:
3642
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3643

    
3644
    # Step: remove old storage
3645
    self.proc.LogStep(6, steps_total, "removing old storage")
3646
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3647
      info("remove logical volumes for %s" % name)
3648
      for lv in old_lvs:
3649
        cfg.SetDiskID(lv, tgt_node)
3650
        if not rpc.call_blockdev_remove(tgt_node, lv):
3651
          warning("Can't remove old LV", hint="manually remove unused LVs")
3652
          continue
3653

    
3654
  def _ExecD8Secondary(self, feedback_fn):
3655
    """Replace the secondary node for drbd8.
3656

3657
    The algorithm for replace is quite complicated:
3658
      - for all disks of the instance:
3659
        - create new LVs on the new node with same names
3660
        - shutdown the drbd device on the old secondary
3661
        - disconnect the drbd network on the primary
3662
        - create the drbd device on the new secondary
3663
        - network attach the drbd on the primary, using an artifice:
3664
          the drbd code for Attach() will connect to the network if it
3665
          finds a device which is connected to the good local disks but
3666
          not network enabled
3667
      - wait for sync across all devices
3668
      - remove all disks from the old secondary
3669

3670
    Failures are not very well handled.
3671

3672
    """
3673
    steps_total = 6
3674
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3675
    instance = self.instance
3676
    iv_names = {}
3677
    vgname = self.cfg.GetVGName()
3678
    # start of work
3679
    cfg = self.cfg
3680
    old_node = self.tgt_node
3681
    new_node = self.new_node
3682
    pri_node = instance.primary_node
3683

    
3684
    # Step: check device activation
3685
    self.proc.LogStep(1, steps_total, "check device existence")
3686
    info("checking volume groups")
3687
    my_vg = cfg.GetVGName()
3688
    results = rpc.call_vg_list([pri_node, new_node])
3689
    if not results:
3690
      raise errors.OpExecError("Can't list volume groups on the nodes")
3691
    for node in pri_node, new_node:
3692
      res = results.get(node, False)
3693
      if not res or my_vg not in res:
3694
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3695
                                 (my_vg, node))
3696
    for dev in instance.disks:
3697
      if not dev.iv_name in self.op.disks:
3698
        continue
3699
      info("checking %s on %s" % (dev.iv_name, pri_node))
3700
      cfg.SetDiskID(dev, pri_node)
3701
      if not rpc.call_blockdev_find(pri_node, dev):
3702
        raise errors.OpExecError("Can't find device %s on node %s" %
3703
                                 (dev.iv_name, pri_node))
3704

    
3705
    # Step: check other node consistency
3706
    self.proc.LogStep(2, steps_total, "check peer consistency")
3707
    for dev in instance.disks:
3708
      if not dev.iv_name in self.op.disks:
3709
        continue
3710
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3711
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3712
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3713
                                 " unsafe to replace the secondary" %
3714
                                 pri_node)
3715

    
3716
    # Step: create new storage
3717
    self.proc.LogStep(3, steps_total, "allocate new storage")
3718
    for dev in instance.disks:
3719
      size = dev.size
3720
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3721
      # since we *always* want to create this LV, we use the
3722
      # _Create...OnPrimary (which forces the creation), even if we
3723
      # are talking about the secondary node
3724
      for new_lv in dev.children:
3725
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3726
                                        _GetInstanceInfoText(instance)):
3727
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3728
                                   " node '%s'" %
3729
                                   (new_lv.logical_id[1], new_node))
3730

    
3731
      iv_names[dev.iv_name] = (dev, dev.children)
3732

    
3733
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3734
    for dev in instance.disks:
3735
      size = dev.size
3736
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3737
      # create new devices on new_node
3738
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3739
                              logical_id=(pri_node, new_node,
3740
                                          dev.logical_id[2]),
3741
                              children=dev.children)
3742
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3743
                                        new_drbd, False,
3744
                                      _GetInstanceInfoText(instance)):
3745
        raise errors.OpExecError("Failed to create new DRBD on"
3746
                                 " node '%s'" % new_node)
3747

    
3748
    for dev in instance.disks:
3749
      # we have new devices, shutdown the drbd on the old secondary
3750
      info("shutting down drbd for %s on old node" % dev.iv_name)
3751
      cfg.SetDiskID(dev, old_node)
3752
      if not rpc.call_blockdev_shutdown(old_node, dev):
3753
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3754
                hint="Please cleanup this device manually as soon as possible")
3755

    
3756
    info("detaching primary drbds from the network (=> standalone)")
3757
    done = 0
3758
    for dev in instance.disks:
3759
      cfg.SetDiskID(dev, pri_node)
3760
      # set the physical (unique in bdev terms) id to None, meaning
3761
      # detach from network
3762
      dev.physical_id = (None,) * len(dev.physical_id)
3763
      # and 'find' the device, which will 'fix' it to match the
3764
      # standalone state
3765
      if rpc.call_blockdev_find(pri_node, dev):
3766
        done += 1
3767
      else:
3768
        warning("Failed to detach drbd %s from network, unusual case" %
3769
                dev.iv_name)
3770

    
3771
    if not done:
3772
      # no detaches succeeded (very unlikely)
3773
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3774

    
3775
    # if we managed to detach at least one, we update all the disks of
3776
    # the instance to point to the new secondary
3777
    info("updating instance configuration")
3778
    for dev in instance.disks:
3779
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3780
      cfg.SetDiskID(dev, pri_node)
3781
    cfg.Update(instance)
3782

    
3783
    # and now perform the drbd attach
3784
    info("attaching primary drbds to new secondary (standalone => connected)")
3785
    failures = []
3786
    for dev in instance.disks:
3787
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3788
      # since the attach is smart, it's enough to 'find' the device,
3789
      # it will automatically activate the network, if the physical_id
3790
      # is correct
3791
      cfg.SetDiskID(dev, pri_node)
3792
      if not rpc.call_blockdev_find(pri_node, dev):
3793
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3794
                "please do a gnt-instance info to see the status of disks")
3795

    
3796
    # this can fail as the old devices are degraded and _WaitForSync
3797
    # does a combined result over all disks, so we don't check its
3798
    # return value
3799
    self.proc.LogStep(5, steps_total, "sync devices")
3800
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3801

    
3802
    # so check manually all the devices
3803
    for name, (dev, old_lvs) in iv_names.iteritems():
3804
      cfg.SetDiskID(dev, pri_node)
3805
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3806
      if is_degr:
3807
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3808

    
3809
    self.proc.LogStep(6, steps_total, "removing old storage")
3810
    for name, (dev, old_lvs) in iv_names.iteritems():
3811
      info("remove logical volumes for %s" % name)
3812
      for lv in old_lvs:
3813
        cfg.SetDiskID(lv, old_node)
3814
        if not rpc.call_blockdev_remove(old_node, lv):
3815
          warning("Can't remove LV on old secondary",
3816
                  hint="Cleanup stale volumes by hand")
3817

    
3818
  def Exec(self, feedback_fn):
3819
    """Execute disk replacement.
3820

3821
    This dispatches the disk replacement to the appropriate handler.
3822

3823
    """
3824
    instance = self.instance
3825

    
3826
    # Activate the instance disks if we're replacing them on a down instance
3827
    if instance.status == "down":
3828
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3829
      self.proc.ChainOpCode(op)
3830

    
3831
    if instance.disk_template == constants.DT_DRBD8:
3832
      if self.op.remote_node is None:
3833
        fn = self._ExecD8DiskOnly
3834
      else:
3835
        fn = self._ExecD8Secondary
3836
    else:
3837
      raise errors.ProgrammerError("Unhandled disk replacement case")
3838

    
3839
    ret = fn(feedback_fn)
3840

    
3841
    # Deactivate the instance disks if we're replacing them on a down instance
3842
    if instance.status == "down":
3843
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3844
      self.proc.ChainOpCode(op)
3845

    
3846
    return ret
3847

    
3848

    
3849
class LUGrowDisk(LogicalUnit):
3850
  """Grow a disk of an instance.
3851

3852
  """
3853
  HPATH = "disk-grow"
3854
  HTYPE = constants.HTYPE_INSTANCE
3855
  _OP_REQP = ["instance_name", "disk", "amount"]
3856

    
3857
  def BuildHooksEnv(self):
3858
    """Build hooks env.
3859

3860
    This runs on the master, the primary and all the secondaries.
3861

3862
    """
3863
    env = {
3864
      "DISK": self.op.disk,
3865
      "AMOUNT": self.op.amount,
3866
      }
3867
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3868
    nl = [
3869
      self.sstore.GetMasterNode(),
3870
      self.instance.primary_node,
3871
      ]
3872
    return env, nl, nl
3873

    
3874
  def CheckPrereq(self):
3875
    """Check prerequisites.
3876

3877
    This checks that the instance is in the cluster.
3878

3879
    """
3880
    instance = self.cfg.GetInstanceInfo(
3881
      self.cfg.ExpandInstanceName(self.op.instance_name))
3882
    if instance is None:
3883
      raise errors.OpPrereqError("Instance '%s' not known" %
3884
                                 self.op.instance_name)
3885
    self.instance = instance
3886
    self.op.instance_name = instance.name
3887

    
3888
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3889
      raise errors.OpPrereqError("Instance's disk layout does not support"
3890
                                 " growing.")
3891

    
3892
    if instance.FindDisk(self.op.disk) is None:
3893
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3894
                                 (self.op.disk, instance.name))
3895

    
3896
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3897
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3898
    for node in nodenames:
3899
      info = nodeinfo.get(node, None)
3900
      if not info:
3901
        raise errors.OpPrereqError("Cannot get current information"
3902
                                   " from node '%s'" % node)
3903
      vg_free = info.get('vg_free', None)
3904
      if not isinstance(vg_free, int):
3905
        raise errors.OpPrereqError("Can't compute free disk space on"
3906
                                   " node %s" % node)
3907
      if self.op.amount > info['vg_free']:
3908
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
3909
                                   " %d MiB available, %d MiB required" %
3910
                                   (node, info['vg_free'], self.op.amount))
3911

    
3912
  def Exec(self, feedback_fn):
3913
    """Execute disk grow.
3914

3915
    """
3916
    instance = self.instance
3917
    disk = instance.FindDisk(self.op.disk)
3918
    for node in (instance.secondary_nodes + (instance.primary_node,)):
3919
      self.cfg.SetDiskID(disk, node)
3920
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3921
      if not result or not isinstance(result, tuple) or len(result) != 2:
3922
        raise errors.OpExecError("grow request failed to node %s" % node)
3923
      elif not result[0]:
3924
        raise errors.OpExecError("grow request failed to node %s: %s" %
3925
                                 (node, result[1]))
3926
    disk.RecordGrow(self.op.amount)
3927
    self.cfg.Update(instance)
3928
    return
3929

    
3930

    
3931
class LUQueryInstanceData(NoHooksLU):
3932
  """Query runtime instance data.
3933

3934
  """
3935
  _OP_REQP = ["instances"]
3936

    
3937
  def CheckPrereq(self):
3938
    """Check prerequisites.
3939

3940
    This only checks the optional instance list against the existing names.
3941

3942
    """
3943
    if not isinstance(self.op.instances, list):
3944
      raise errors.OpPrereqError("Invalid argument type 'instances'")
3945
    if self.op.instances:
3946
      self.wanted_instances = []
3947
      names = self.op.instances
3948
      for name in names:
3949
        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3950
        if instance is None:
3951
          raise errors.OpPrereqError("No such instance name '%s'" % name)
3952
        self.wanted_instances.append(instance)
3953
    else:
3954
      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3955
                               in self.cfg.GetInstanceList()]
3956
    return
3957

    
3958

    
3959
  def _ComputeDiskStatus(self, instance, snode, dev):
3960
    """Compute block device status.
3961

3962
    """
3963
    self.cfg.SetDiskID(dev, instance.primary_node)
3964
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3965
    if dev.dev_type in constants.LDS_DRBD:
3966
      # we change the snode then (otherwise we use the one passed in)
3967
      if dev.logical_id[0] == instance.primary_node:
3968
        snode = dev.logical_id[1]
3969
      else:
3970
        snode = dev.logical_id[0]
3971

    
3972
    if snode:
3973
      self.cfg.SetDiskID(dev, snode)
3974
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
3975
    else:
3976
      dev_sstatus = None
3977

    
3978
    if dev.children:
3979
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
3980
                      for child in dev.children]
3981
    else:
3982
      dev_children = []
3983

    
3984
    data = {
3985
      "iv_name": dev.iv_name,
3986
      "dev_type": dev.dev_type,
3987
      "logical_id": dev.logical_id,
3988
      "physical_id": dev.physical_id,
3989
      "pstatus": dev_pstatus,
3990
      "sstatus": dev_sstatus,
3991
      "children": dev_children,
3992
      }
3993

    
3994
    return data
3995

    
3996
  def Exec(self, feedback_fn):
3997
    """Gather and return data"""
3998
    result = {}
3999
    for instance in self.wanted_instances:
4000
      remote_info = rpc.call_instance_info(instance.primary_node,
4001
                                                instance.name)
4002
      if remote_info and "state" in remote_info:
4003
        remote_state = "up"
4004
      else:
4005
        remote_state = "down"
4006
      if instance.status == "down":
4007
        config_state = "down"
4008
      else:
4009
        config_state = "up"
4010

    
4011
      disks = [self._ComputeDiskStatus(instance, None, device)
4012
               for device in instance.disks]
4013

    
4014
      idict = {
4015
        "name": instance.name,
4016
        "config_state": config_state,
4017
        "run_state": remote_state,
4018
        "pnode": instance.primary_node,
4019
        "snodes": instance.secondary_nodes,
4020
        "os": instance.os,
4021
        "memory": instance.memory,
4022
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4023
        "disks": disks,
4024
        "vcpus": instance.vcpus,
4025
        }
4026

    
4027
      htkind = self.sstore.GetHypervisorType()
4028
      if htkind == constants.HT_XEN_PVM30:
4029
        idict["kernel_path"] = instance.kernel_path
4030
        idict["initrd_path"] = instance.initrd_path
4031

    
4032
      if htkind == constants.HT_XEN_HVM31:
4033
        idict["hvm_boot_order"] = instance.hvm_boot_order
4034
        idict["hvm_acpi"] = instance.hvm_acpi
4035
        idict["hvm_pae"] = instance.hvm_pae
4036
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4037

    
4038
      if htkind in constants.HTS_REQ_PORT:
4039
        idict["vnc_bind_address"] = instance.vnc_bind_address
4040
        idict["network_port"] = instance.network_port
4041

    
4042
      result[instance.name] = idict
4043

    
4044
    return result
4045

    
4046

    
4047
class LUSetInstanceParams(LogicalUnit):
4048
  """Modifies an instances's parameters.
4049

4050
  """
4051
  HPATH = "instance-modify"
4052
  HTYPE = constants.HTYPE_INSTANCE
4053
  _OP_REQP = ["instance_name"]
4054
  REQ_BGL = False
4055

    
4056
  def ExpandNames(self):
4057
    self._ExpandAndLockInstance()
4058

    
4059
  def BuildHooksEnv(self):
4060
    """Build hooks env.
4061

4062
    This runs on the master, primary and secondaries.
4063

4064
    """
4065
    args = dict()
4066
    if self.mem:
4067
      args['memory'] = self.mem
4068
    if self.vcpus:
4069
      args['vcpus'] = self.vcpus
4070
    if self.do_ip or self.do_bridge or self.mac:
4071
      if self.do_ip:
4072
        ip = self.ip
4073
      else:
4074
        ip = self.instance.nics[0].ip
4075
      if self.bridge:
4076
        bridge = self.bridge
4077
      else:
4078
        bridge = self.instance.nics[0].bridge
4079
      if self.mac:
4080
        mac = self.mac
4081
      else:
4082
        mac = self.instance.nics[0].mac
4083
      args['nics'] = [(ip, bridge, mac)]
4084
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4085
    nl = [self.sstore.GetMasterNode(),
4086
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4087
    return env, nl, nl
4088

    
4089
  def CheckPrereq(self):
4090
    """Check prerequisites.
4091

4092
    This only checks the instance list against the existing names.
4093

4094
    """
4095
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4096
    # a separate CheckArguments function, if we implement one, so the operation
4097
    # can be aborted without waiting for any lock, should it have an error...
4098
    self.mem = getattr(self.op, "mem", None)
4099
    self.vcpus = getattr(self.op, "vcpus", None)
4100
    self.ip = getattr(self.op, "ip", None)
4101
    self.mac = getattr(self.op, "mac", None)
4102
    self.bridge = getattr(self.op, "bridge", None)
4103
    self.kernel_path = getattr(self.op, "kernel_path", None)
4104
    self.initrd_path = getattr(self.op, "initrd_path", None)
4105
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4106
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4107
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4108
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4109
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4110
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4111
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4112
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4113
                 self.vnc_bind_address]
4114
    if all_parms.count(None) == len(all_parms):
4115
      raise errors.OpPrereqError("No changes submitted")
4116
    if self.mem is not None:
4117
      try:
4118
        self.mem = int(self.mem)
4119
      except ValueError, err:
4120
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4121
    if self.vcpus is not None:
4122
      try:
4123
        self.vcpus = int(self.vcpus)
4124
      except ValueError, err:
4125
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4126
    if self.ip is not None:
4127
      self.do_ip = True
4128
      if self.ip.lower() == "none":
4129
        self.ip = None
4130
      else:
4131
        if not utils.IsValidIP(self.ip):
4132
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4133
    else:
4134
      self.do_ip = False
4135
    self.do_bridge = (self.bridge is not None)
4136
    if self.mac is not None:
4137
      if self.cfg.IsMacInUse(self.mac):
4138
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4139
                                   self.mac)
4140
      if not utils.IsValidMac(self.mac):
4141
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4142

    
4143
    if self.kernel_path is not None:
4144
      self.do_kernel_path = True
4145
      if self.kernel_path == constants.VALUE_NONE:
4146
        raise errors.OpPrereqError("Can't set instance to no kernel")
4147

    
4148
      if self.kernel_path != constants.VALUE_DEFAULT:
4149
        if not os.path.isabs(self.kernel_path):
4150
          raise errors.OpPrereqError("The kernel path must be an absolute"
4151
                                    " filename")
4152
    else:
4153
      self.do_kernel_path = False
4154

    
4155
    if self.initrd_path is not None:
4156
      self.do_initrd_path = True
4157
      if self.initrd_path not in (constants.VALUE_NONE,
4158
                                  constants.VALUE_DEFAULT):
4159
        if not os.path.isabs(self.initrd_path):
4160
          raise errors.OpPrereqError("The initrd path must be an absolute"
4161
                                    " filename")
4162
    else:
4163
      self.do_initrd_path = False
4164

    
4165
    # boot order verification
4166
    if self.hvm_boot_order is not None:
4167
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4168
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4169
          raise errors.OpPrereqError("invalid boot order specified,"
4170
                                     " must be one or more of [acdn]"
4171
                                     " or 'default'")
4172

    
4173
    # hvm_cdrom_image_path verification
4174
    if self.op.hvm_cdrom_image_path is not None:
4175
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
4176
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4177
                                   " be an absolute path or None, not %s" %
4178
                                   self.op.hvm_cdrom_image_path)
4179
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
4180
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4181
                                   " regular file or a symlink pointing to"
4182
                                   " an existing regular file, not %s" %
4183
                                   self.op.hvm_cdrom_image_path)
4184

    
4185
    # vnc_bind_address verification
4186
    if self.op.vnc_bind_address is not None:
4187
      if not utils.IsValidIP(self.op.vnc_bind_address):
4188
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4189
                                   " like a valid IP address" %
4190
                                   self.op.vnc_bind_address)
4191

    
4192
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4193
    assert self.instance is not None, \
4194
      "Cannot retrieve locked instance %s" % self.op.instance_name
4195
    return
4196

    
4197
  def Exec(self, feedback_fn):
4198
    """Modifies an instance.
4199

4200
    All parameters take effect only at the next restart of the instance.
4201
    """
4202
    result = []
4203
    instance = self.instance
4204
    if self.mem:
4205
      instance.memory = self.mem
4206
      result.append(("mem", self.mem))
4207
    if self.vcpus:
4208
      instance.vcpus = self.vcpus
4209
      result.append(("vcpus",  self.vcpus))
4210
    if self.do_ip:
4211
      instance.nics[0].ip = self.ip
4212
      result.append(("ip", self.ip))
4213
    if self.bridge:
4214
      instance.nics[0].bridge = self.bridge
4215
      result.append(("bridge", self.bridge))
4216
    if self.mac:
4217
      instance.nics[0].mac = self.mac
4218
      result.append(("mac", self.mac))
4219
    if self.do_kernel_path:
4220
      instance.kernel_path = self.kernel_path
4221
      result.append(("kernel_path", self.kernel_path))
4222
    if self.do_initrd_path:
4223
      instance.initrd_path = self.initrd_path
4224
      result.append(("initrd_path", self.initrd_path))
4225
    if self.hvm_boot_order:
4226
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4227
        instance.hvm_boot_order = None
4228
      else:
4229
        instance.hvm_boot_order = self.hvm_boot_order
4230
      result.append(("hvm_boot_order", self.hvm_boot_order))
4231
    if self.hvm_acpi:
4232
      instance.hvm_acpi = self.hvm_acpi
4233
      result.append(("hvm_acpi", self.hvm_acpi))
4234
    if self.hvm_pae:
4235
      instance.hvm_pae = self.hvm_pae
4236
      result.append(("hvm_pae", self.hvm_pae))
4237
    if self.hvm_cdrom_image_path:
4238
      instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4239
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4240
    if self.vnc_bind_address:
4241
      instance.vnc_bind_address = self.vnc_bind_address
4242
      result.append(("vnc_bind_address", self.vnc_bind_address))
4243

    
4244
    self.cfg.Update(instance)
4245

    
4246
    return result
4247

    
4248

    
4249
class LUQueryExports(NoHooksLU):
4250
  """Query the exports list
4251

4252
  """
4253
  _OP_REQP = []
4254

    
4255
  def CheckPrereq(self):
4256
    """Check that the nodelist contains only existing nodes.
4257

4258
    """
4259
    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4260

    
4261
  def Exec(self, feedback_fn):
4262
    """Compute the list of all the exported system images.
4263

4264
    Returns:
4265
      a dictionary with the structure node->(export-list)
4266
      where export-list is a list of the instances exported on
4267
      that node.
4268

4269
    """
4270
    return rpc.call_export_list(self.nodes)
4271

    
4272

    
4273
class LUExportInstance(LogicalUnit):
4274
  """Export an instance to an image in the cluster.
4275

4276
  """
4277
  HPATH = "instance-export"
4278
  HTYPE = constants.HTYPE_INSTANCE
4279
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4280

    
4281
  def BuildHooksEnv(self):
4282
    """Build hooks env.
4283

4284
    This will run on the master, primary node and target node.
4285

4286
    """
4287
    env = {
4288
      "EXPORT_NODE": self.op.target_node,
4289
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4290
      }
4291
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4292
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4293
          self.op.target_node]
4294
    return env, nl, nl
4295

    
4296
  def CheckPrereq(self):
4297
    """Check prerequisites.
4298

4299
    This checks that the instance and node names are valid.
4300

4301
    """
4302
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4303
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4304
    if self.instance is None:
4305
      raise errors.OpPrereqError("Instance '%s' not found" %
4306
                                 self.op.instance_name)
4307

    
4308
    # node verification
4309
    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4310
    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4311

    
4312
    if self.dst_node is None:
4313
      raise errors.OpPrereqError("Destination node '%s' is unknown." %
4314
                                 self.op.target_node)
4315
    self.op.target_node = self.dst_node.name
4316

    
4317
    # instance disk type verification
4318
    for disk in self.instance.disks:
4319
      if disk.dev_type == constants.LD_FILE:
4320
        raise errors.OpPrereqError("Export not supported for instances with"
4321
                                   " file-based disks")
4322

    
4323
  def Exec(self, feedback_fn):
4324
    """Export an instance to an image in the cluster.
4325

4326
    """
4327
    instance = self.instance
4328
    dst_node = self.dst_node
4329
    src_node = instance.primary_node
4330
    if self.op.shutdown:
4331
      # shutdown the instance, but not the disks
4332
      if not rpc.call_instance_shutdown(src_node, instance):
4333
         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4334
                                  (instance.name, src_node))
4335

    
4336
    vgname = self.cfg.GetVGName()
4337

    
4338
    snap_disks = []
4339

    
4340
    try:
4341
      for disk in instance.disks:
4342
        if disk.iv_name == "sda":
4343
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4344
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4345

    
4346
          if not new_dev_name:
4347
            logger.Error("could not snapshot block device %s on node %s" %
4348
                         (disk.logical_id[1], src_node))
4349
          else:
4350
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4351
                                      logical_id=(vgname, new_dev_name),
4352
                                      physical_id=(vgname, new_dev_name),
4353
                                      iv_name=disk.iv_name)
4354
            snap_disks.append(new_dev)
4355

    
4356
    finally:
4357
      if self.op.shutdown and instance.status == "up":
4358
        if not rpc.call_instance_start(src_node, instance, None):
4359
          _ShutdownInstanceDisks(instance, self.cfg)
4360
          raise errors.OpExecError("Could not start instance")
4361

    
4362
    # TODO: check for size
4363

    
4364
    for dev in snap_disks:
4365
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4366
        logger.Error("could not export block device %s from node %s to node %s"
4367
                     % (dev.logical_id[1], src_node, dst_node.name))
4368
      if not rpc.call_blockdev_remove(src_node, dev):
4369
        logger.Error("could not remove snapshot block device %s from node %s" %
4370
                     (dev.logical_id[1], src_node))
4371

    
4372
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4373
      logger.Error("could not finalize export for instance %s on node %s" %
4374
                   (instance.name, dst_node.name))
4375

    
4376
    nodelist = self.cfg.GetNodeList()
4377
    nodelist.remove(dst_node.name)
4378

    
4379
    # on one-node clusters nodelist will be empty after the removal
4380
    # if we proceed the backup would be removed because OpQueryExports
4381
    # substitutes an empty list with the full cluster node list.
4382
    if nodelist:
4383
      op = opcodes.OpQueryExports(nodes=nodelist)
4384
      exportlist = self.proc.ChainOpCode(op)
4385
      for node in exportlist:
4386
        if instance.name in exportlist[node]:
4387
          if not rpc.call_export_remove(node, instance.name):
4388
            logger.Error("could not remove older export for instance %s"
4389
                         " on node %s" % (instance.name, node))
4390

    
4391

    
4392
class LURemoveExport(NoHooksLU):
4393
  """Remove exports related to the named instance.
4394

4395
  """
4396
  _OP_REQP = ["instance_name"]
4397

    
4398
  def CheckPrereq(self):
4399
    """Check prerequisites.
4400
    """
4401
    pass
4402

    
4403
  def Exec(self, feedback_fn):
4404
    """Remove any export.
4405

4406
    """
4407
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4408
    # If the instance was not found we'll try with the name that was passed in.
4409
    # This will only work if it was an FQDN, though.
4410
    fqdn_warn = False
4411
    if not instance_name:
4412
      fqdn_warn = True
4413
      instance_name = self.op.instance_name
4414

    
4415
    op = opcodes.OpQueryExports(nodes=[])
4416
    exportlist = self.proc.ChainOpCode(op)
4417
    found = False
4418
    for node in exportlist:
4419
      if instance_name in exportlist[node]:
4420
        found = True
4421
        if not rpc.call_export_remove(node, instance_name):
4422
          logger.Error("could not remove export for instance %s"
4423
                       " on node %s" % (instance_name, node))
4424

    
4425
    if fqdn_warn and not found:
4426
      feedback_fn("Export not found. If trying to remove an export belonging"
4427
                  " to a deleted instance please use its Fully Qualified"
4428
                  " Domain Name.")
4429

    
4430

    
4431
class TagsLU(NoHooksLU):
4432
  """Generic tags LU.
4433

4434
  This is an abstract class which is the parent of all the other tags LUs.
4435

4436
  """
4437
  def CheckPrereq(self):
4438
    """Check prerequisites.
4439

4440
    """
4441
    if self.op.kind == constants.TAG_CLUSTER:
4442
      self.target = self.cfg.GetClusterInfo()
4443
    elif self.op.kind == constants.TAG_NODE:
4444
      name = self.cfg.ExpandNodeName(self.op.name)
4445
      if name is None:
4446
        raise errors.OpPrereqError("Invalid node name (%s)" %
4447
                                   (self.op.name,))
4448
      self.op.name = name
4449
      self.target = self.cfg.GetNodeInfo(name)
4450
    elif self.op.kind == constants.TAG_INSTANCE:
4451
      name = self.cfg.ExpandInstanceName(self.op.name)
4452
      if name is None:
4453
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4454
                                   (self.op.name,))
4455
      self.op.name = name
4456
      self.target = self.cfg.GetInstanceInfo(name)
4457
    else:
4458
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4459
                                 str(self.op.kind))
4460

    
4461

    
4462
class LUGetTags(TagsLU):
4463
  """Returns the tags of a given object.
4464

4465
  """
4466
  _OP_REQP = ["kind", "name"]
4467

    
4468
  def Exec(self, feedback_fn):
4469
    """Returns the tag list.
4470

4471
    """
4472
    return list(self.target.GetTags())
4473

    
4474

    
4475
class LUSearchTags(NoHooksLU):
4476
  """Searches the tags for a given pattern.
4477

4478
  """
4479
  _OP_REQP = ["pattern"]
4480

    
4481
  def CheckPrereq(self):
4482
    """Check prerequisites.
4483

4484
    This checks the pattern passed for validity by compiling it.
4485

4486
    """
4487
    try:
4488
      self.re = re.compile(self.op.pattern)
4489
    except re.error, err:
4490
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4491
                                 (self.op.pattern, err))
4492

    
4493
  def Exec(self, feedback_fn):
4494
    """Returns the tag list.
4495

4496
    """
4497
    cfg = self.cfg
4498
    tgts = [("/cluster", cfg.GetClusterInfo())]
4499
    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4500
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4501
    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4502
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4503
    results = []
4504
    for path, target in tgts:
4505
      for tag in target.GetTags():
4506
        if self.re.search(tag):
4507
          results.append((path, tag))
4508
    return results
4509

    
4510

    
4511
class LUAddTags(TagsLU):
4512
  """Sets a tag on a given object.
4513

4514
  """
4515
  _OP_REQP = ["kind", "name", "tags"]
4516

    
4517
  def CheckPrereq(self):
4518
    """Check prerequisites.
4519

4520
    This checks the type and length of the tag name and value.
4521

4522
    """
4523
    TagsLU.CheckPrereq(self)
4524
    for tag in self.op.tags:
4525
      objects.TaggableObject.ValidateTag(tag)
4526

    
4527
  def Exec(self, feedback_fn):
4528
    """Sets the tag.
4529

4530
    """
4531
    try:
4532
      for tag in self.op.tags:
4533
        self.target.AddTag(tag)
4534
    except errors.TagError, err:
4535
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4536
    try:
4537
      self.cfg.Update(self.target)
4538
    except errors.ConfigurationError:
4539
      raise errors.OpRetryError("There has been a modification to the"
4540
                                " config file and the operation has been"
4541
                                " aborted. Please retry.")
4542

    
4543

    
4544
class LUDelTags(TagsLU):
4545
  """Delete a list of tags from a given object.
4546

4547
  """
4548
  _OP_REQP = ["kind", "name", "tags"]
4549

    
4550
  def CheckPrereq(self):
4551
    """Check prerequisites.
4552

4553
    This checks that we have the given tag.
4554

4555
    """
4556
    TagsLU.CheckPrereq(self)
4557
    for tag in self.op.tags:
4558
      objects.TaggableObject.ValidateTag(tag)
4559
    del_tags = frozenset(self.op.tags)
4560
    cur_tags = self.target.GetTags()
4561
    if not del_tags <= cur_tags:
4562
      diff_tags = del_tags - cur_tags
4563
      diff_names = ["'%s'" % tag for tag in diff_tags]
4564
      diff_names.sort()
4565
      raise errors.OpPrereqError("Tag(s) %s not found" %
4566
                                 (",".join(diff_names)))
4567

    
4568
  def Exec(self, feedback_fn):
4569
    """Remove the tag from the object.
4570

4571
    """
4572
    for tag in self.op.tags:
4573
      self.target.RemoveTag(tag)
4574
    try:
4575
      self.cfg.Update(self.target)
4576
    except errors.ConfigurationError:
4577
      raise errors.OpRetryError("There has been a modification to the"
4578
                                " config file and the operation has been"
4579
                                " aborted. Please retry.")
4580

    
4581

    
4582
class LUTestDelay(NoHooksLU):
4583
  """Sleep for a specified amount of time.
4584

4585
  This LU sleeps on the master and/or nodes for a specified amount of
4586
  time.
4587

4588
  """
4589
  _OP_REQP = ["duration", "on_master", "on_nodes"]
4590
  REQ_BGL = False
4591

    
4592
  def ExpandNames(self):
4593
    """Expand names and set required locks.
4594

4595
    This expands the node list, if any.
4596

4597
    """
4598
    self.needed_locks = {}
4599
    if self.op.on_nodes:
4600
      # _GetWantedNodes can be used here, but is not always appropriate to use
4601
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4602
      # more information.
4603
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4604
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4605

    
4606
  def CheckPrereq(self):
4607
    """Check prerequisites.
4608

4609
    """
4610

    
4611
  def Exec(self, feedback_fn):
4612
    """Do the actual sleep.
4613

4614
    """
4615
    if self.op.on_master:
4616
      if not utils.TestDelay(self.op.duration):
4617
        raise errors.OpExecError("Error during master delay test")
4618
    if self.op.on_nodes:
4619
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4620
      if not result:
4621
        raise errors.OpExecError("Complete failure from rpc call")
4622
      for node, node_result in result.items():
4623
        if not node_result:
4624
          raise errors.OpExecError("Failure during rpc call to node %s,"
4625
                                   " result: %s" % (node, node_result))
4626

    
4627

    
4628
class IAllocator(object):
4629
  """IAllocator framework.
4630

4631
  An IAllocator instance has three sets of attributes:
4632
    - cfg/sstore that are needed to query the cluster
4633
    - input data (all members of the _KEYS class attribute are required)
4634
    - four buffer attributes (in|out_data|text), that represent the
4635
      input (to the external script) in text and data structure format,
4636
      and the output from it, again in two formats
4637
    - the result variables from the script (success, info, nodes) for
4638
      easy usage
4639

4640
  """
4641
  _ALLO_KEYS = [
4642
    "mem_size", "disks", "disk_template",
4643
    "os", "tags", "nics", "vcpus",
4644
    ]
4645
  _RELO_KEYS = [
4646
    "relocate_from",
4647
    ]
4648

    
4649
  def __init__(self, cfg, sstore, mode, name, **kwargs):
4650
    self.cfg = cfg
4651
    self.sstore = sstore
4652
    # init buffer variables
4653
    self.in_text = self.out_text = self.in_data = self.out_data = None
4654
    # init all input fields so that pylint is happy
4655
    self.mode = mode
4656
    self.name = name
4657
    self.mem_size = self.disks = self.disk_template = None
4658
    self.os = self.tags = self.nics = self.vcpus = None
4659
    self.relocate_from = None
4660
    # computed fields
4661
    self.required_nodes = None
4662
    # init result fields
4663
    self.success = self.info = self.nodes = None
4664
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4665
      keyset = self._ALLO_KEYS
4666
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4667
      keyset = self._RELO_KEYS
4668
    else:
4669
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4670
                                   " IAllocator" % self.mode)
4671
    for key in kwargs:
4672
      if key not in keyset:
4673
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
4674
                                     " IAllocator" % key)
4675
      setattr(self, key, kwargs[key])
4676
    for key in keyset:
4677
      if key not in kwargs:
4678
        raise errors.ProgrammerError("Missing input parameter '%s' to"
4679
                                     " IAllocator" % key)
4680
    self._BuildInputData()
4681

    
4682
  def _ComputeClusterData(self):
4683
    """Compute the generic allocator input data.
4684

4685
    This is the data that is independent of the actual operation.
4686

4687
    """
4688
    cfg = self.cfg
4689
    # cluster data
4690
    data = {
4691
      "version": 1,
4692
      "cluster_name": self.sstore.GetClusterName(),
4693
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4694
      "hypervisor_type": self.sstore.GetHypervisorType(),
4695
      # we don't have job IDs
4696
      }
4697

    
4698
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4699

    
4700
    # node data
4701
    node_results = {}
4702
    node_list = cfg.GetNodeList()
4703
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4704
    for nname in node_list:
4705
      ninfo = cfg.GetNodeInfo(nname)
4706
      if nname not in node_data or not isinstance(node_data[nname], dict):
4707
        raise errors.OpExecError("Can't get data for node %s" % nname)
4708
      remote_info = node_data[nname]
4709
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
4710
                   'vg_size', 'vg_free', 'cpu_total']:
4711
        if attr not in remote_info:
4712
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4713
                                   (nname, attr))
4714
        try:
4715
          remote_info[attr] = int(remote_info[attr])
4716
        except ValueError, err:
4717
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4718
                                   " %s" % (nname, attr, str(err)))
4719
      # compute memory used by primary instances
4720
      i_p_mem = i_p_up_mem = 0
4721
      for iinfo in i_list:
4722
        if iinfo.primary_node == nname:
4723
          i_p_mem += iinfo.memory
4724
          if iinfo.status == "up":
4725
            i_p_up_mem += iinfo.memory
4726

    
4727
      # compute memory used by instances
4728
      pnr = {
4729
        "tags": list(ninfo.GetTags()),
4730
        "total_memory": remote_info['memory_total'],
4731
        "reserved_memory": remote_info['memory_dom0'],
4732
        "free_memory": remote_info['memory_free'],
4733
        "i_pri_memory": i_p_mem,
4734
        "i_pri_up_memory": i_p_up_mem,
4735
        "total_disk": remote_info['vg_size'],
4736
        "free_disk": remote_info['vg_free'],
4737
        "primary_ip": ninfo.primary_ip,
4738
        "secondary_ip": ninfo.secondary_ip,
4739
        "total_cpus": remote_info['cpu_total'],
4740
        }
4741
      node_results[nname] = pnr
4742
    data["nodes"] = node_results
4743

    
4744
    # instance data
4745
    instance_data = {}
4746
    for iinfo in i_list:
4747
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4748
                  for n in iinfo.nics]
4749
      pir = {
4750
        "tags": list(iinfo.GetTags()),
4751
        "should_run": iinfo.status == "up",
4752
        "vcpus": iinfo.vcpus,
4753
        "memory": iinfo.memory,
4754
        "os": iinfo.os,
4755
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4756
        "nics": nic_data,
4757
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4758
        "disk_template": iinfo.disk_template,
4759
        }
4760
      instance_data[iinfo.name] = pir
4761

    
4762
    data["instances"] = instance_data
4763

    
4764
    self.in_data = data
4765

    
4766
  def _AddNewInstance(self):
4767
    """Add new instance data to allocator structure.
4768

4769
    This in combination with _AllocatorGetClusterData will create the
4770
    correct structure needed as input for the allocator.
4771

4772
    The checks for the completeness of the opcode must have already been
4773
    done.
4774

4775
    """
4776
    data = self.in_data
4777
    if len(self.disks) != 2:
4778
      raise errors.OpExecError("Only two-disk configurations supported")
4779

    
4780
    disk_space = _ComputeDiskSize(self.disk_template,
4781
                                  self.disks[0]["size"], self.disks[1]["size"])
4782

    
4783
    if self.disk_template in constants.DTS_NET_MIRROR:
4784
      self.required_nodes = 2
4785
    else:
4786
      self.required_nodes = 1
4787
    request = {
4788
      "type": "allocate",
4789
      "name": self.name,
4790
      "disk_template": self.disk_template,
4791
      "tags": self.tags,
4792
      "os": self.os,
4793
      "vcpus": self.vcpus,
4794
      "memory": self.mem_size,
4795
      "disks": self.disks,
4796
      "disk_space_total": disk_space,
4797
      "nics": self.nics,
4798
      "required_nodes": self.required_nodes,
4799
      }
4800
    data["request"] = request
4801

    
4802
  def _AddRelocateInstance(self):
4803
    """Add relocate instance data to allocator structure.
4804

4805
    This in combination with _IAllocatorGetClusterData will create the
4806
    correct structure needed as input for the allocator.
4807

4808
    The checks for the completeness of the opcode must have already been
4809
    done.
4810

4811
    """
4812
    instance = self.cfg.GetInstanceInfo(self.name)
4813
    if instance is None:
4814
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
4815
                                   " IAllocator" % self.name)
4816

    
4817
    if instance.disk_template not in constants.DTS_NET_MIRROR:
4818
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4819

    
4820
    if len(instance.secondary_nodes) != 1:
4821
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
4822

    
4823
    self.required_nodes = 1
4824

    
4825
    disk_space = _ComputeDiskSize(instance.disk_template,
4826
                                  instance.disks[0].size,
4827
                                  instance.disks[1].size)
4828

    
4829
    request = {
4830
      "type": "relocate",
4831
      "name": self.name,
4832
      "disk_space_total": disk_space,
4833
      "required_nodes": self.required_nodes,
4834
      "relocate_from": self.relocate_from,
4835
      }
4836
    self.in_data["request"] = request
4837

    
4838
  def _BuildInputData(self):
4839
    """Build input data structures.
4840

4841
    """
4842
    self._ComputeClusterData()
4843

    
4844
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4845
      self._AddNewInstance()
4846
    else:
4847
      self._AddRelocateInstance()
4848

    
4849
    self.in_text = serializer.Dump(self.in_data)
4850

    
4851
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4852
    """Run an instance allocator and return the results.
4853

4854
    """
4855
    data = self.in_text
4856

    
4857
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4858

    
4859
    if not isinstance(result, tuple) or len(result) != 4:
4860
      raise errors.OpExecError("Invalid result from master iallocator runner")
4861

    
4862
    rcode, stdout, stderr, fail = result
4863

    
4864
    if rcode == constants.IARUN_NOTFOUND:
4865
      raise errors.OpExecError("Can't find allocator '%s'" % name)
4866
    elif rcode == constants.IARUN_FAILURE:
4867
        raise errors.OpExecError("Instance allocator call failed: %s,"
4868
                                 " output: %s" %
4869
                                 (fail, stdout+stderr))
4870
    self.out_text = stdout
4871
    if validate:
4872
      self._ValidateResult()
4873

    
4874
  def _ValidateResult(self):
4875
    """Process the allocator results.
4876

4877
    This will process and if successful save the result in
4878
    self.out_data and the other parameters.
4879

4880
    """
4881
    try:
4882
      rdict = serializer.Load(self.out_text)
4883
    except Exception, err:
4884
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4885

    
4886
    if not isinstance(rdict, dict):
4887
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
4888

    
4889
    for key in "success", "info", "nodes":
4890
      if key not in rdict:
4891
        raise errors.OpExecError("Can't parse iallocator results:"
4892
                                 " missing key '%s'" % key)
4893
      setattr(self, key, rdict[key])
4894

    
4895
    if not isinstance(rdict["nodes"], list):
4896
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4897
                               " is not a list")
4898
    self.out_data = rdict
4899

    
4900

    
4901
class LUTestAllocator(NoHooksLU):
4902
  """Run allocator tests.
4903

4904
  This LU runs the allocator tests
4905

4906
  """
4907
  _OP_REQP = ["direction", "mode", "name"]
4908

    
4909
  def CheckPrereq(self):
4910
    """Check prerequisites.
4911

4912
    This checks the opcode parameters depending on the director and mode test.
4913

4914
    """
4915
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4916
      for attr in ["name", "mem_size", "disks", "disk_template",
4917
                   "os", "tags", "nics", "vcpus"]:
4918
        if not hasattr(self.op, attr):
4919
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4920
                                     attr)
4921
      iname = self.cfg.ExpandInstanceName(self.op.name)
4922
      if iname is not None:
4923
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4924
                                   iname)
4925
      if not isinstance(self.op.nics, list):
4926
        raise errors.OpPrereqError("Invalid parameter 'nics'")
4927
      for row in self.op.nics:
4928
        if (not isinstance(row, dict) or
4929
            "mac" not in row or
4930
            "ip" not in row or
4931
            "bridge" not in row):
4932
          raise errors.OpPrereqError("Invalid contents of the"
4933
                                     " 'nics' parameter")
4934
      if not isinstance(self.op.disks, list):
4935
        raise errors.OpPrereqError("Invalid parameter 'disks'")
4936
      if len(self.op.disks) != 2:
4937
        raise errors.OpPrereqError("Only two-disk configurations supported")
4938
      for row in self.op.disks:
4939
        if (not isinstance(row, dict) or
4940
            "size" not in row or
4941
            not isinstance(row["size"], int) or
4942
            "mode" not in row or
4943
            row["mode"] not in ['r', 'w']):
4944
          raise errors.OpPrereqError("Invalid contents of the"
4945
                                     " 'disks' parameter")
4946
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4947
      if not hasattr(self.op, "name"):
4948
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4949
      fname = self.cfg.ExpandInstanceName(self.op.name)
4950
      if fname is None:
4951
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4952
                                   self.op.name)
4953
      self.op.name = fname
4954
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4955
    else:
4956
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4957
                                 self.op.mode)
4958

    
4959
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4960
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
4961
        raise errors.OpPrereqError("Missing allocator name")
4962
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4963
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
4964
                                 self.op.direction)
4965

    
4966
  def Exec(self, feedback_fn):
4967
    """Run the allocator test.
4968

4969
    """
4970
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4971
      ial = IAllocator(self.cfg, self.sstore,
4972
                       mode=self.op.mode,
4973
                       name=self.op.name,
4974
                       mem_size=self.op.mem_size,
4975
                       disks=self.op.disks,
4976
                       disk_template=self.op.disk_template,
4977
                       os=self.op.os,
4978
                       tags=self.op.tags,
4979
                       nics=self.op.nics,
4980
                       vcpus=self.op.vcpus,
4981
                       )
4982
    else:
4983
      ial = IAllocator(self.cfg, self.sstore,
4984
                       mode=self.op.mode,
4985
                       name=self.op.name,
4986
                       relocate_from=list(self.relocate_from),
4987
                       )
4988

    
4989
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
4990
      result = ial.in_text
4991
    else:
4992
      ial.Run(self.op.allocator, validate=False)
4993
      result = ial.out_text
4994
    return result