Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ f9518d38

History | View | Annotate | Download (188.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.sstore = sstore
83
    self.context = context
84
    # Dicts used to declare locking needs to mcpu
85
    self.needed_locks = None
86
    self.acquired_locks = {}
87
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88
    self.add_locks = {}
89
    self.remove_locks = {}
90
    # Used to force good behavior when calling helper functions
91
    self.recalculate_locks = {}
92
    self.__ssh = None
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = sstore.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.sstore)
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  Args:
377
    static: Static fields
378
    dynamic: Dynamic fields
379

380
  """
381
  static_fields = frozenset(static)
382
  dynamic_fields = frozenset(dynamic)
383

    
384
  all_fields = static_fields | dynamic_fields
385

    
386
  if not all_fields.issuperset(selected):
387
    raise errors.OpPrereqError("Unknown output fields selected: %s"
388
                               % ",".join(frozenset(selected).
389
                                          difference(all_fields)))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  args = {
434
    'name': instance.name,
435
    'primary_node': instance.primary_node,
436
    'secondary_nodes': instance.secondary_nodes,
437
    'os_type': instance.os,
438
    'status': instance.os,
439
    'memory': instance.memory,
440
    'vcpus': instance.vcpus,
441
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442
  }
443
  if override:
444
    args.update(override)
445
  return _BuildInstanceHookEnv(**args)
446

    
447

    
448
def _CheckInstanceBridgesExist(instance):
449
  """Check that the brigdes needed by an instance exist.
450

451
  """
452
  # check bridges existance
453
  brlist = [nic.bridge for nic in instance.nics]
454
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
455
    raise errors.OpPrereqError("one or more target bridges %s does not"
456
                               " exist on destination node '%s'" %
457
                               (brlist, instance.primary_node))
458

    
459

    
460
class LUDestroyCluster(NoHooksLU):
461
  """Logical unit for destroying the cluster.
462

463
  """
464
  _OP_REQP = []
465

    
466
  def CheckPrereq(self):
467
    """Check prerequisites.
468

469
    This checks whether the cluster is empty.
470

471
    Any errors are signalled by raising errors.OpPrereqError.
472

473
    """
474
    master = self.sstore.GetMasterNode()
475

    
476
    nodelist = self.cfg.GetNodeList()
477
    if len(nodelist) != 1 or nodelist[0] != master:
478
      raise errors.OpPrereqError("There are still %d node(s) in"
479
                                 " this cluster." % (len(nodelist) - 1))
480
    instancelist = self.cfg.GetInstanceList()
481
    if instancelist:
482
      raise errors.OpPrereqError("There are still %d instance(s) in"
483
                                 " this cluster." % len(instancelist))
484

    
485
  def Exec(self, feedback_fn):
486
    """Destroys the cluster.
487

488
    """
489
    master = self.sstore.GetMasterNode()
490
    if not rpc.call_node_stop_master(master, False):
491
      raise errors.OpExecError("Could not disable the master role")
492
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493
    utils.CreateBackup(priv_key)
494
    utils.CreateBackup(pub_key)
495
    return master
496

    
497

    
498
class LUVerifyCluster(LogicalUnit):
499
  """Verifies the cluster status.
500

501
  """
502
  HPATH = "cluster-verify"
503
  HTYPE = constants.HTYPE_CLUSTER
504
  _OP_REQP = ["skip_checks"]
505
  REQ_BGL = False
506

    
507
  def ExpandNames(self):
508
    self.needed_locks = {
509
      locking.LEVEL_NODE: locking.ALL_SET,
510
      locking.LEVEL_INSTANCE: locking.ALL_SET,
511
    }
512
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513

    
514
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515
                  remote_version, feedback_fn):
516
    """Run multiple tests against a node.
517

518
    Test list:
519
      - compares ganeti version
520
      - checks vg existance and size > 20G
521
      - checks config file checksum
522
      - checks ssh to other nodes
523

524
    Args:
525
      node: name of the node to check
526
      file_list: required list of files
527
      local_cksum: dictionary of local files and their checksums
528

529
    """
530
    # compares ganeti version
531
    local_version = constants.PROTOCOL_VERSION
532
    if not remote_version:
533
      feedback_fn("  - ERROR: connection to %s failed" % (node))
534
      return True
535

    
536
    if local_version != remote_version:
537
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538
                      (local_version, node, remote_version))
539
      return True
540

    
541
    # checks vg existance and size > 20G
542

    
543
    bad = False
544
    if not vglist:
545
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546
                      (node,))
547
      bad = True
548
    else:
549
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550
                                            constants.MIN_VG_SIZE)
551
      if vgstatus:
552
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553
        bad = True
554

    
555
    # checks config file checksum
556
    # checks ssh to any
557

    
558
    if 'filelist' not in node_result:
559
      bad = True
560
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
561
    else:
562
      remote_cksum = node_result['filelist']
563
      for file_name in file_list:
564
        if file_name not in remote_cksum:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
567
        elif remote_cksum[file_name] != local_cksum[file_name]:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570

    
571
    if 'nodelist' not in node_result:
572
      bad = True
573
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574
    else:
575
      if node_result['nodelist']:
576
        bad = True
577
        for node in node_result['nodelist']:
578
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579
                          (node, node_result['nodelist'][node]))
580
    if 'node-net-test' not in node_result:
581
      bad = True
582
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583
    else:
584
      if node_result['node-net-test']:
585
        bad = True
586
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
587
        for node in nlist:
588
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589
                          (node, node_result['node-net-test'][node]))
590

    
591
    hyp_result = node_result.get('hypervisor', None)
592
    if hyp_result is not None:
593
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594
    return bad
595

    
596
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597
                      node_instance, feedback_fn):
598
    """Verify an instance.
599

600
    This function checks to see if the required block devices are
601
    available on the instance's node.
602

603
    """
604
    bad = False
605

    
606
    node_current = instanceconfig.primary_node
607

    
608
    node_vol_should = {}
609
    instanceconfig.MapLVsByNode(node_vol_should)
610

    
611
    for node in node_vol_should:
612
      for volume in node_vol_should[node]:
613
        if node not in node_vol_is or volume not in node_vol_is[node]:
614
          feedback_fn("  - ERROR: volume %s missing on node %s" %
615
                          (volume, node))
616
          bad = True
617

    
618
    if not instanceconfig.status == 'down':
619
      if (node_current not in node_instance or
620
          not instance in node_instance[node_current]):
621
        feedback_fn("  - ERROR: instance %s not running on node %s" %
622
                        (instance, node_current))
623
        bad = True
624

    
625
    for node in node_instance:
626
      if (not node == node_current):
627
        if instance in node_instance[node]:
628
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
629
                          (instance, node))
630
          bad = True
631

    
632
    return bad
633

    
634
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635
    """Verify if there are any unknown volumes in the cluster.
636

637
    The .os, .swap and backup volumes are ignored. All other volumes are
638
    reported as unknown.
639

640
    """
641
    bad = False
642

    
643
    for node in node_vol_is:
644
      for volume in node_vol_is[node]:
645
        if node not in node_vol_should or volume not in node_vol_should[node]:
646
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647
                      (volume, node))
648
          bad = True
649
    return bad
650

    
651
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652
    """Verify the list of running instances.
653

654
    This checks what instances are running but unknown to the cluster.
655

656
    """
657
    bad = False
658
    for node in node_instance:
659
      for runninginstance in node_instance[node]:
660
        if runninginstance not in instancelist:
661
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662
                          (runninginstance, node))
663
          bad = True
664
    return bad
665

    
666
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667
    """Verify N+1 Memory Resilience.
668

669
    Check that if one single node dies we can still start all the instances it
670
    was primary for.
671

672
    """
673
    bad = False
674

    
675
    for node, nodeinfo in node_info.iteritems():
676
      # This code checks that every node which is now listed as secondary has
677
      # enough memory to host all instances it is supposed to should a single
678
      # other node in the cluster fail.
679
      # FIXME: not ready for failover to an arbitrary node
680
      # FIXME: does not support file-backed instances
681
      # WARNING: we currently take into account down instances as well as up
682
      # ones, considering that even if they're down someone might want to start
683
      # them even in the event of a node failure.
684
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685
        needed_mem = 0
686
        for instance in instances:
687
          needed_mem += instance_cfg[instance].memory
688
        if nodeinfo['mfree'] < needed_mem:
689
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690
                      " failovers should node %s fail" % (node, prinode))
691
          bad = True
692
    return bad
693

    
694
  def CheckPrereq(self):
695
    """Check prerequisites.
696

697
    Transform the list of checks we're going to skip into a set and check that
698
    all its members are valid.
699

700
    """
701
    self.skip_set = frozenset(self.op.skip_checks)
702
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    Cluster-Verify hooks just rone in the post phase and their failure makes
709
    the output be logged in the verify output and the verification to fail.
710

711
    """
712
    all_nodes = self.cfg.GetNodeList()
713
    # TODO: populate the environment with useful information for verify hooks
714
    env = {}
715
    return env, [], all_nodes
716

    
717
  def Exec(self, feedback_fn):
718
    """Verify integrity of cluster, performing various test on nodes.
719

720
    """
721
    bad = False
722
    feedback_fn("* Verifying global settings")
723
    for msg in self.cfg.VerifyConfig():
724
      feedback_fn("  - ERROR: %s" % msg)
725

    
726
    vg_name = self.cfg.GetVGName()
727
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
728
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730
    i_non_redundant = [] # Non redundant instances
731
    node_volume = {}
732
    node_instance = {}
733
    node_info = {}
734
    instance_cfg = {}
735

    
736
    # FIXME: verify OS list
737
    # do local checksums
738
    file_names = list(self.sstore.GetFileList())
739
    file_names.append(constants.SSL_CERT_FILE)
740
    file_names.append(constants.CLUSTER_CONF_FILE)
741
    local_checksums = utils.FingerprintFiles(file_names)
742

    
743
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745
    all_instanceinfo = rpc.call_instance_list(nodelist)
746
    all_vglist = rpc.call_vg_list(nodelist)
747
    node_verify_param = {
748
      'filelist': file_names,
749
      'nodelist': nodelist,
750
      'hypervisor': None,
751
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752
                        for node in nodeinfo]
753
      }
754
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755
    all_rversion = rpc.call_version(nodelist)
756
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757

    
758
    for node in nodelist:
759
      feedback_fn("* Verifying node %s" % node)
760
      result = self._VerifyNode(node, file_names, local_checksums,
761
                                all_vglist[node], all_nvinfo[node],
762
                                all_rversion[node], feedback_fn)
763
      bad = bad or result
764

    
765
      # node_volume
766
      volumeinfo = all_volumeinfo[node]
767

    
768
      if isinstance(volumeinfo, basestring):
769
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770
                    (node, volumeinfo[-400:].encode('string_escape')))
771
        bad = True
772
        node_volume[node] = {}
773
      elif not isinstance(volumeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777
      else:
778
        node_volume[node] = volumeinfo
779

    
780
      # node_instance
781
      nodeinstance = all_instanceinfo[node]
782
      if type(nodeinstance) != list:
783
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
784
        bad = True
785
        continue
786

    
787
      node_instance[node] = nodeinstance
788

    
789
      # node_info
790
      nodeinfo = all_ninfo[node]
791
      if not isinstance(nodeinfo, dict):
792
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
793
        bad = True
794
        continue
795

    
796
      try:
797
        node_info[node] = {
798
          "mfree": int(nodeinfo['memory_free']),
799
          "dfree": int(nodeinfo['vg_free']),
800
          "pinst": [],
801
          "sinst": [],
802
          # dictionary holding all instances this node is secondary for,
803
          # grouped by their primary node. Each key is a cluster node, and each
804
          # value is a list of instances which have the key as primary and the
805
          # current node as secondary.  this is handy to calculate N+1 memory
806
          # availability if you can only failover from a primary to its
807
          # secondary.
808
          "sinst-by-pnode": {},
809
        }
810
      except ValueError:
811
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812
        bad = True
813
        continue
814

    
815
    node_vol_should = {}
816

    
817
    for instance in instancelist:
818
      feedback_fn("* Verifying instance %s" % instance)
819
      inst_config = self.cfg.GetInstanceInfo(instance)
820
      result =  self._VerifyInstance(instance, inst_config, node_volume,
821
                                     node_instance, feedback_fn)
822
      bad = bad or result
823

    
824
      inst_config.MapLVsByNode(node_vol_should)
825

    
826
      instance_cfg[instance] = inst_config
827

    
828
      pnode = inst_config.primary_node
829
      if pnode in node_info:
830
        node_info[pnode]['pinst'].append(instance)
831
      else:
832
        feedback_fn("  - ERROR: instance %s, connection to primary node"
833
                    " %s failed" % (instance, pnode))
834
        bad = True
835

    
836
      # If the instance is non-redundant we cannot survive losing its primary
837
      # node, so we are not N+1 compliant. On the other hand we have no disk
838
      # templates with more than one secondary so that situation is not well
839
      # supported either.
840
      # FIXME: does not support file-backed instances
841
      if len(inst_config.secondary_nodes) == 0:
842
        i_non_redundant.append(instance)
843
      elif len(inst_config.secondary_nodes) > 1:
844
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
845
                    % instance)
846

    
847
      for snode in inst_config.secondary_nodes:
848
        if snode in node_info:
849
          node_info[snode]['sinst'].append(instance)
850
          if pnode not in node_info[snode]['sinst-by-pnode']:
851
            node_info[snode]['sinst-by-pnode'][pnode] = []
852
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853
        else:
854
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
855
                      " %s failed" % (instance, snode))
856

    
857
    feedback_fn("* Verifying orphan volumes")
858
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859
                                       feedback_fn)
860
    bad = bad or result
861

    
862
    feedback_fn("* Verifying remaining instances")
863
    result = self._VerifyOrphanInstances(instancelist, node_instance,
864
                                         feedback_fn)
865
    bad = bad or result
866

    
867
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868
      feedback_fn("* Verifying N+1 Memory redundancy")
869
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870
      bad = bad or result
871

    
872
    feedback_fn("* Other Notes")
873
    if i_non_redundant:
874
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875
                  % len(i_non_redundant))
876

    
877
    return not bad
878

    
879
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880
    """Analize the post-hooks' result, handle it, and send some
881
    nicely-formatted feedback back to the user.
882

883
    Args:
884
      phase: the hooks phase that has just been run
885
      hooks_results: the results of the multi-node hooks rpc call
886
      feedback_fn: function to send feedback back to the caller
887
      lu_result: previous Exec result
888

889
    """
890
    # We only really run POST phase hooks, and are only interested in
891
    # their results
892
    if phase == constants.HOOKS_PHASE_POST:
893
      # Used to change hooks' output to proper indentation
894
      indent_re = re.compile('^', re.M)
895
      feedback_fn("* Hooks Results")
896
      if not hooks_results:
897
        feedback_fn("  - ERROR: general communication failure")
898
        lu_result = 1
899
      else:
900
        for node_name in hooks_results:
901
          show_node_header = True
902
          res = hooks_results[node_name]
903
          if res is False or not isinstance(res, list):
904
            feedback_fn("    Communication failure")
905
            lu_result = 1
906
            continue
907
          for script, hkr, output in res:
908
            if hkr == constants.HKR_FAIL:
909
              # The node header is only shown once, if there are
910
              # failing hooks on that node
911
              if show_node_header:
912
                feedback_fn("  Node %s:" % node_name)
913
                show_node_header = False
914
              feedback_fn("    ERROR: Script %s failed, output:" % script)
915
              output = indent_re.sub('      ', output)
916
              feedback_fn("%s" % output)
917
              lu_result = 1
918

    
919
      return lu_result
920

    
921

    
922
class LUVerifyDisks(NoHooksLU):
923
  """Verifies the cluster disks status.
924

925
  """
926
  _OP_REQP = []
927
  REQ_BGL = False
928

    
929
  def ExpandNames(self):
930
    self.needed_locks = {
931
      locking.LEVEL_NODE: locking.ALL_SET,
932
      locking.LEVEL_INSTANCE: locking.ALL_SET,
933
    }
934
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935

    
936
  def CheckPrereq(self):
937
    """Check prerequisites.
938

939
    This has no prerequisites.
940

941
    """
942
    pass
943

    
944
  def Exec(self, feedback_fn):
945
    """Verify integrity of cluster disks.
946

947
    """
948
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949

    
950
    vg_name = self.cfg.GetVGName()
951
    nodes = utils.NiceSort(self.cfg.GetNodeList())
952
    instances = [self.cfg.GetInstanceInfo(name)
953
                 for name in self.cfg.GetInstanceList()]
954

    
955
    nv_dict = {}
956
    for inst in instances:
957
      inst_lvs = {}
958
      if (inst.status != "up" or
959
          inst.disk_template not in constants.DTS_NET_MIRROR):
960
        continue
961
      inst.MapLVsByNode(inst_lvs)
962
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963
      for node, vol_list in inst_lvs.iteritems():
964
        for vol in vol_list:
965
          nv_dict[(node, vol)] = inst
966

    
967
    if not nv_dict:
968
      return result
969

    
970
    node_lvs = rpc.call_volume_list(nodes, vg_name)
971

    
972
    to_act = set()
973
    for node in nodes:
974
      # node_volume
975
      lvs = node_lvs[node]
976

    
977
      if isinstance(lvs, basestring):
978
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979
        res_nlvm[node] = lvs
980
      elif not isinstance(lvs, dict):
981
        logger.Info("connection to node %s failed or invalid data returned" %
982
                    (node,))
983
        res_nodes.append(node)
984
        continue
985

    
986
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987
        inst = nv_dict.pop((node, lv_name), None)
988
        if (not lv_online and inst is not None
989
            and inst.name not in res_instances):
990
          res_instances.append(inst.name)
991

    
992
    # any leftover items in nv_dict are missing LVs, let's arrange the
993
    # data better
994
    for key, inst in nv_dict.iteritems():
995
      if inst.name not in res_missing:
996
        res_missing[inst.name] = []
997
      res_missing[inst.name].append(key)
998

    
999
    return result
1000

    
1001

    
1002
class LURenameCluster(LogicalUnit):
1003
  """Rename the cluster.
1004

1005
  """
1006
  HPATH = "cluster-rename"
1007
  HTYPE = constants.HTYPE_CLUSTER
1008
  _OP_REQP = ["name"]
1009
  REQ_WSSTORE = True
1010

    
1011
  def BuildHooksEnv(self):
1012
    """Build hooks env.
1013

1014
    """
1015
    env = {
1016
      "OP_TARGET": self.sstore.GetClusterName(),
1017
      "NEW_NAME": self.op.name,
1018
      }
1019
    mn = self.sstore.GetMasterNode()
1020
    return env, [mn], [mn]
1021

    
1022
  def CheckPrereq(self):
1023
    """Verify that the passed name is a valid one.
1024

1025
    """
1026
    hostname = utils.HostInfo(self.op.name)
1027

    
1028
    new_name = hostname.name
1029
    self.ip = new_ip = hostname.ip
1030
    old_name = self.sstore.GetClusterName()
1031
    old_ip = self.sstore.GetMasterIP()
1032
    if new_name == old_name and new_ip == old_ip:
1033
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1034
                                 " cluster has changed")
1035
    if new_ip != old_ip:
1036
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1037
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1038
                                   " reachable on the network. Aborting." %
1039
                                   new_ip)
1040

    
1041
    self.op.name = new_name
1042

    
1043
  def Exec(self, feedback_fn):
1044
    """Rename the cluster.
1045

1046
    """
1047
    clustername = self.op.name
1048
    ip = self.ip
1049
    ss = self.sstore
1050

    
1051
    # shutdown the master IP
1052
    master = ss.GetMasterNode()
1053
    if not rpc.call_node_stop_master(master, False):
1054
      raise errors.OpExecError("Could not disable the master role")
1055

    
1056
    try:
1057
      # modify the sstore
1058
      ss.SetKey(ss.SS_MASTER_IP, ip)
1059
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1060

    
1061
      # Distribute updated ss config to all nodes
1062
      myself = self.cfg.GetNodeInfo(master)
1063
      dist_nodes = self.cfg.GetNodeList()
1064
      if myself.name in dist_nodes:
1065
        dist_nodes.remove(myself.name)
1066

    
1067
      logger.Debug("Copying updated ssconf data to all nodes")
1068
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1069
        fname = ss.KeyToFilename(keyname)
1070
        result = rpc.call_upload_file(dist_nodes, fname)
1071
        for to_node in dist_nodes:
1072
          if not result[to_node]:
1073
            logger.Error("copy of file %s to node %s failed" %
1074
                         (fname, to_node))
1075
    finally:
1076
      if not rpc.call_node_start_master(master, False):
1077
        logger.Error("Could not re-enable the master role on the master,"
1078
                     " please restart manually.")
1079

    
1080

    
1081
def _RecursiveCheckIfLVMBased(disk):
1082
  """Check if the given disk or its children are lvm-based.
1083

1084
  Args:
1085
    disk: ganeti.objects.Disk object
1086

1087
  Returns:
1088
    boolean indicating whether a LD_LV dev_type was found or not
1089

1090
  """
1091
  if disk.children:
1092
    for chdisk in disk.children:
1093
      if _RecursiveCheckIfLVMBased(chdisk):
1094
        return True
1095
  return disk.dev_type == constants.LD_LV
1096

    
1097

    
1098
class LUSetClusterParams(LogicalUnit):
1099
  """Change the parameters of the cluster.
1100

1101
  """
1102
  HPATH = "cluster-modify"
1103
  HTYPE = constants.HTYPE_CLUSTER
1104
  _OP_REQP = []
1105
  REQ_BGL = False
1106

    
1107
  def ExpandNames(self):
1108
    # FIXME: in the future maybe other cluster params won't require checking on
1109
    # all nodes to be modified.
1110
    self.needed_locks = {
1111
      locking.LEVEL_NODE: locking.ALL_SET,
1112
    }
1113
    self.share_locks[locking.LEVEL_NODE] = 1
1114

    
1115
  def BuildHooksEnv(self):
1116
    """Build hooks env.
1117

1118
    """
1119
    env = {
1120
      "OP_TARGET": self.sstore.GetClusterName(),
1121
      "NEW_VG_NAME": self.op.vg_name,
1122
      }
1123
    mn = self.sstore.GetMasterNode()
1124
    return env, [mn], [mn]
1125

    
1126
  def CheckPrereq(self):
1127
    """Check prerequisites.
1128

1129
    This checks whether the given params don't conflict and
1130
    if the given volume group is valid.
1131

1132
    """
1133
    # FIXME: This only works because there is only one parameter that can be
1134
    # changed or removed.
1135
    if not self.op.vg_name:
1136
      instances = self.cfg.GetAllInstancesInfo().values()
1137
      for inst in instances:
1138
        for disk in inst.disks:
1139
          if _RecursiveCheckIfLVMBased(disk):
1140
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1141
                                       " lvm-based instances exist")
1142

    
1143
    # if vg_name not None, checks given volume group on all nodes
1144
    if self.op.vg_name:
1145
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1146
      vglist = rpc.call_vg_list(node_list)
1147
      for node in node_list:
1148
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1149
                                              constants.MIN_VG_SIZE)
1150
        if vgstatus:
1151
          raise errors.OpPrereqError("Error on node '%s': %s" %
1152
                                     (node, vgstatus))
1153

    
1154
  def Exec(self, feedback_fn):
1155
    """Change the parameters of the cluster.
1156

1157
    """
1158
    if self.op.vg_name != self.cfg.GetVGName():
1159
      self.cfg.SetVGName(self.op.vg_name)
1160
    else:
1161
      feedback_fn("Cluster LVM configuration already in desired"
1162
                  " state, not changing")
1163

    
1164

    
1165
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1166
  """Sleep and poll for an instance's disk to sync.
1167

1168
  """
1169
  if not instance.disks:
1170
    return True
1171

    
1172
  if not oneshot:
1173
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1174

    
1175
  node = instance.primary_node
1176

    
1177
  for dev in instance.disks:
1178
    cfgw.SetDiskID(dev, node)
1179

    
1180
  retries = 0
1181
  while True:
1182
    max_time = 0
1183
    done = True
1184
    cumul_degraded = False
1185
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1186
    if not rstats:
1187
      proc.LogWarning("Can't get any data from node %s" % node)
1188
      retries += 1
1189
      if retries >= 10:
1190
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1191
                                 " aborting." % node)
1192
      time.sleep(6)
1193
      continue
1194
    retries = 0
1195
    for i in range(len(rstats)):
1196
      mstat = rstats[i]
1197
      if mstat is None:
1198
        proc.LogWarning("Can't compute data for node %s/%s" %
1199
                        (node, instance.disks[i].iv_name))
1200
        continue
1201
      # we ignore the ldisk parameter
1202
      perc_done, est_time, is_degraded, _ = mstat
1203
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1204
      if perc_done is not None:
1205
        done = False
1206
        if est_time is not None:
1207
          rem_time = "%d estimated seconds remaining" % est_time
1208
          max_time = est_time
1209
        else:
1210
          rem_time = "no time estimate"
1211
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1212
                     (instance.disks[i].iv_name, perc_done, rem_time))
1213
    if done or oneshot:
1214
      break
1215

    
1216
    time.sleep(min(60, max_time))
1217

    
1218
  if done:
1219
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1220
  return not cumul_degraded
1221

    
1222

    
1223
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1224
  """Check that mirrors are not degraded.
1225

1226
  The ldisk parameter, if True, will change the test from the
1227
  is_degraded attribute (which represents overall non-ok status for
1228
  the device(s)) to the ldisk (representing the local storage status).
1229

1230
  """
1231
  cfgw.SetDiskID(dev, node)
1232
  if ldisk:
1233
    idx = 6
1234
  else:
1235
    idx = 5
1236

    
1237
  result = True
1238
  if on_primary or dev.AssembleOnSecondary():
1239
    rstats = rpc.call_blockdev_find(node, dev)
1240
    if not rstats:
1241
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1242
      result = False
1243
    else:
1244
      result = result and (not rstats[idx])
1245
  if dev.children:
1246
    for child in dev.children:
1247
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1248

    
1249
  return result
1250

    
1251

    
1252
class LUDiagnoseOS(NoHooksLU):
1253
  """Logical unit for OS diagnose/query.
1254

1255
  """
1256
  _OP_REQP = ["output_fields", "names"]
1257
  REQ_BGL = False
1258

    
1259
  def ExpandNames(self):
1260
    if self.op.names:
1261
      raise errors.OpPrereqError("Selective OS query not supported")
1262

    
1263
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1264
    _CheckOutputFields(static=[],
1265
                       dynamic=self.dynamic_fields,
1266
                       selected=self.op.output_fields)
1267

    
1268
    # Lock all nodes, in shared mode
1269
    self.needed_locks = {}
1270
    self.share_locks[locking.LEVEL_NODE] = 1
1271
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1272

    
1273
  def CheckPrereq(self):
1274
    """Check prerequisites.
1275

1276
    """
1277

    
1278
  @staticmethod
1279
  def _DiagnoseByOS(node_list, rlist):
1280
    """Remaps a per-node return list into an a per-os per-node dictionary
1281

1282
      Args:
1283
        node_list: a list with the names of all nodes
1284
        rlist: a map with node names as keys and OS objects as values
1285

1286
      Returns:
1287
        map: a map with osnames as keys and as value another map, with
1288
             nodes as
1289
             keys and list of OS objects as values
1290
             e.g. {"debian-etch": {"node1": [<object>,...],
1291
                                   "node2": [<object>,]}
1292
                  }
1293

1294
    """
1295
    all_os = {}
1296
    for node_name, nr in rlist.iteritems():
1297
      if not nr:
1298
        continue
1299
      for os_obj in nr:
1300
        if os_obj.name not in all_os:
1301
          # build a list of nodes for this os containing empty lists
1302
          # for each node in node_list
1303
          all_os[os_obj.name] = {}
1304
          for nname in node_list:
1305
            all_os[os_obj.name][nname] = []
1306
        all_os[os_obj.name][node_name].append(os_obj)
1307
    return all_os
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Compute the list of OSes.
1311

1312
    """
1313
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1314
    node_data = rpc.call_os_diagnose(node_list)
1315
    if node_data == False:
1316
      raise errors.OpExecError("Can't gather the list of OSes")
1317
    pol = self._DiagnoseByOS(node_list, node_data)
1318
    output = []
1319
    for os_name, os_data in pol.iteritems():
1320
      row = []
1321
      for field in self.op.output_fields:
1322
        if field == "name":
1323
          val = os_name
1324
        elif field == "valid":
1325
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1326
        elif field == "node_status":
1327
          val = {}
1328
          for node_name, nos_list in os_data.iteritems():
1329
            val[node_name] = [(v.status, v.path) for v in nos_list]
1330
        else:
1331
          raise errors.ParameterError(field)
1332
        row.append(val)
1333
      output.append(row)
1334

    
1335
    return output
1336

    
1337

    
1338
class LURemoveNode(LogicalUnit):
1339
  """Logical unit for removing a node.
1340

1341
  """
1342
  HPATH = "node-remove"
1343
  HTYPE = constants.HTYPE_NODE
1344
  _OP_REQP = ["node_name"]
1345

    
1346
  def BuildHooksEnv(self):
1347
    """Build hooks env.
1348

1349
    This doesn't run on the target node in the pre phase as a failed
1350
    node would then be impossible to remove.
1351

1352
    """
1353
    env = {
1354
      "OP_TARGET": self.op.node_name,
1355
      "NODE_NAME": self.op.node_name,
1356
      }
1357
    all_nodes = self.cfg.GetNodeList()
1358
    all_nodes.remove(self.op.node_name)
1359
    return env, all_nodes, all_nodes
1360

    
1361
  def CheckPrereq(self):
1362
    """Check prerequisites.
1363

1364
    This checks:
1365
     - the node exists in the configuration
1366
     - it does not have primary or secondary instances
1367
     - it's not the master
1368

1369
    Any errors are signalled by raising errors.OpPrereqError.
1370

1371
    """
1372
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1373
    if node is None:
1374
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1375

    
1376
    instance_list = self.cfg.GetInstanceList()
1377

    
1378
    masternode = self.sstore.GetMasterNode()
1379
    if node.name == masternode:
1380
      raise errors.OpPrereqError("Node is the master node,"
1381
                                 " you need to failover first.")
1382

    
1383
    for instance_name in instance_list:
1384
      instance = self.cfg.GetInstanceInfo(instance_name)
1385
      if node.name == instance.primary_node:
1386
        raise errors.OpPrereqError("Instance %s still running on the node,"
1387
                                   " please remove first." % instance_name)
1388
      if node.name in instance.secondary_nodes:
1389
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1390
                                   " please remove first." % instance_name)
1391
    self.op.node_name = node.name
1392
    self.node = node
1393

    
1394
  def Exec(self, feedback_fn):
1395
    """Removes the node from the cluster.
1396

1397
    """
1398
    node = self.node
1399
    logger.Info("stopping the node daemon and removing configs from node %s" %
1400
                node.name)
1401

    
1402
    self.context.RemoveNode(node.name)
1403

    
1404
    rpc.call_node_leave_cluster(node.name)
1405

    
1406

    
1407
class LUQueryNodes(NoHooksLU):
1408
  """Logical unit for querying nodes.
1409

1410
  """
1411
  _OP_REQP = ["output_fields", "names"]
1412
  REQ_BGL = False
1413

    
1414
  def ExpandNames(self):
1415
    self.dynamic_fields = frozenset([
1416
      "dtotal", "dfree",
1417
      "mtotal", "mnode", "mfree",
1418
      "bootid",
1419
      "ctotal",
1420
      ])
1421

    
1422
    self.static_fields = frozenset([
1423
      "name", "pinst_cnt", "sinst_cnt",
1424
      "pinst_list", "sinst_list",
1425
      "pip", "sip", "tags",
1426
      "serial_no",
1427
      ])
1428

    
1429
    _CheckOutputFields(static=self.static_fields,
1430
                       dynamic=self.dynamic_fields,
1431
                       selected=self.op.output_fields)
1432

    
1433
    self.needed_locks = {}
1434
    self.share_locks[locking.LEVEL_NODE] = 1
1435

    
1436
    if self.op.names:
1437
      self.wanted = _GetWantedNodes(self, self.op.names)
1438
    else:
1439
      self.wanted = locking.ALL_SET
1440

    
1441
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1442
    if self.do_locking:
1443
      # if we don't request only static fields, we need to lock the nodes
1444
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1445

    
1446

    
1447
  def CheckPrereq(self):
1448
    """Check prerequisites.
1449

1450
    """
1451
    # The validation of the node list is done in the _GetWantedNodes,
1452
    # if non empty, and if empty, there's no validation to do
1453
    pass
1454

    
1455
  def Exec(self, feedback_fn):
1456
    """Computes the list of nodes and their attributes.
1457

1458
    """
1459
    all_info = self.cfg.GetAllNodesInfo()
1460
    if self.do_locking:
1461
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1462
    elif self.wanted != locking.ALL_SET:
1463
      nodenames = self.wanted
1464
      missing = set(nodenames).difference(all_info.keys())
1465
      if missing:
1466
        raise self.OpExecError(
1467
          "Some nodes were removed before retrieving their data: %s" % missing)
1468
    else:
1469
      nodenames = all_info.keys()
1470
    nodelist = [all_info[name] for name in nodenames]
1471

    
1472
    # begin data gathering
1473

    
1474
    if self.dynamic_fields.intersection(self.op.output_fields):
1475
      live_data = {}
1476
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1477
      for name in nodenames:
1478
        nodeinfo = node_data.get(name, None)
1479
        if nodeinfo:
1480
          live_data[name] = {
1481
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1482
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1483
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1484
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1485
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1486
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1487
            "bootid": nodeinfo['bootid'],
1488
            }
1489
        else:
1490
          live_data[name] = {}
1491
    else:
1492
      live_data = dict.fromkeys(nodenames, {})
1493

    
1494
    node_to_primary = dict([(name, set()) for name in nodenames])
1495
    node_to_secondary = dict([(name, set()) for name in nodenames])
1496

    
1497
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1498
                             "sinst_cnt", "sinst_list"))
1499
    if inst_fields & frozenset(self.op.output_fields):
1500
      instancelist = self.cfg.GetInstanceList()
1501

    
1502
      for instance_name in instancelist:
1503
        inst = self.cfg.GetInstanceInfo(instance_name)
1504
        if inst.primary_node in node_to_primary:
1505
          node_to_primary[inst.primary_node].add(inst.name)
1506
        for secnode in inst.secondary_nodes:
1507
          if secnode in node_to_secondary:
1508
            node_to_secondary[secnode].add(inst.name)
1509

    
1510
    # end data gathering
1511

    
1512
    output = []
1513
    for node in nodelist:
1514
      node_output = []
1515
      for field in self.op.output_fields:
1516
        if field == "name":
1517
          val = node.name
1518
        elif field == "pinst_list":
1519
          val = list(node_to_primary[node.name])
1520
        elif field == "sinst_list":
1521
          val = list(node_to_secondary[node.name])
1522
        elif field == "pinst_cnt":
1523
          val = len(node_to_primary[node.name])
1524
        elif field == "sinst_cnt":
1525
          val = len(node_to_secondary[node.name])
1526
        elif field == "pip":
1527
          val = node.primary_ip
1528
        elif field == "sip":
1529
          val = node.secondary_ip
1530
        elif field == "tags":
1531
          val = list(node.GetTags())
1532
        elif field == "serial_no":
1533
          val = node.serial_no
1534
        elif field in self.dynamic_fields:
1535
          val = live_data[node.name].get(field, None)
1536
        else:
1537
          raise errors.ParameterError(field)
1538
        node_output.append(val)
1539
      output.append(node_output)
1540

    
1541
    return output
1542

    
1543

    
1544
class LUQueryNodeVolumes(NoHooksLU):
1545
  """Logical unit for getting volumes on node(s).
1546

1547
  """
1548
  _OP_REQP = ["nodes", "output_fields"]
1549
  REQ_BGL = False
1550

    
1551
  def ExpandNames(self):
1552
    _CheckOutputFields(static=["node"],
1553
                       dynamic=["phys", "vg", "name", "size", "instance"],
1554
                       selected=self.op.output_fields)
1555

    
1556
    self.needed_locks = {}
1557
    self.share_locks[locking.LEVEL_NODE] = 1
1558
    if not self.op.nodes:
1559
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1560
    else:
1561
      self.needed_locks[locking.LEVEL_NODE] = \
1562
        _GetWantedNodes(self, self.op.nodes)
1563

    
1564
  def CheckPrereq(self):
1565
    """Check prerequisites.
1566

1567
    This checks that the fields required are valid output fields.
1568

1569
    """
1570
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1571

    
1572
  def Exec(self, feedback_fn):
1573
    """Computes the list of nodes and their attributes.
1574

1575
    """
1576
    nodenames = self.nodes
1577
    volumes = rpc.call_node_volumes(nodenames)
1578

    
1579
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1580
             in self.cfg.GetInstanceList()]
1581

    
1582
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1583

    
1584
    output = []
1585
    for node in nodenames:
1586
      if node not in volumes or not volumes[node]:
1587
        continue
1588

    
1589
      node_vols = volumes[node][:]
1590
      node_vols.sort(key=lambda vol: vol['dev'])
1591

    
1592
      for vol in node_vols:
1593
        node_output = []
1594
        for field in self.op.output_fields:
1595
          if field == "node":
1596
            val = node
1597
          elif field == "phys":
1598
            val = vol['dev']
1599
          elif field == "vg":
1600
            val = vol['vg']
1601
          elif field == "name":
1602
            val = vol['name']
1603
          elif field == "size":
1604
            val = int(float(vol['size']))
1605
          elif field == "instance":
1606
            for inst in ilist:
1607
              if node not in lv_by_node[inst]:
1608
                continue
1609
              if vol['name'] in lv_by_node[inst][node]:
1610
                val = inst.name
1611
                break
1612
            else:
1613
              val = '-'
1614
          else:
1615
            raise errors.ParameterError(field)
1616
          node_output.append(str(val))
1617

    
1618
        output.append(node_output)
1619

    
1620
    return output
1621

    
1622

    
1623
class LUAddNode(LogicalUnit):
1624
  """Logical unit for adding node to the cluster.
1625

1626
  """
1627
  HPATH = "node-add"
1628
  HTYPE = constants.HTYPE_NODE
1629
  _OP_REQP = ["node_name"]
1630

    
1631
  def BuildHooksEnv(self):
1632
    """Build hooks env.
1633

1634
    This will run on all nodes before, and on all nodes + the new node after.
1635

1636
    """
1637
    env = {
1638
      "OP_TARGET": self.op.node_name,
1639
      "NODE_NAME": self.op.node_name,
1640
      "NODE_PIP": self.op.primary_ip,
1641
      "NODE_SIP": self.op.secondary_ip,
1642
      }
1643
    nodes_0 = self.cfg.GetNodeList()
1644
    nodes_1 = nodes_0 + [self.op.node_name, ]
1645
    return env, nodes_0, nodes_1
1646

    
1647
  def CheckPrereq(self):
1648
    """Check prerequisites.
1649

1650
    This checks:
1651
     - the new node is not already in the config
1652
     - it is resolvable
1653
     - its parameters (single/dual homed) matches the cluster
1654

1655
    Any errors are signalled by raising errors.OpPrereqError.
1656

1657
    """
1658
    node_name = self.op.node_name
1659
    cfg = self.cfg
1660

    
1661
    dns_data = utils.HostInfo(node_name)
1662

    
1663
    node = dns_data.name
1664
    primary_ip = self.op.primary_ip = dns_data.ip
1665
    secondary_ip = getattr(self.op, "secondary_ip", None)
1666
    if secondary_ip is None:
1667
      secondary_ip = primary_ip
1668
    if not utils.IsValidIP(secondary_ip):
1669
      raise errors.OpPrereqError("Invalid secondary IP given")
1670
    self.op.secondary_ip = secondary_ip
1671

    
1672
    node_list = cfg.GetNodeList()
1673
    if not self.op.readd and node in node_list:
1674
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1675
                                 node)
1676
    elif self.op.readd and node not in node_list:
1677
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1678

    
1679
    for existing_node_name in node_list:
1680
      existing_node = cfg.GetNodeInfo(existing_node_name)
1681

    
1682
      if self.op.readd and node == existing_node_name:
1683
        if (existing_node.primary_ip != primary_ip or
1684
            existing_node.secondary_ip != secondary_ip):
1685
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1686
                                     " address configuration as before")
1687
        continue
1688

    
1689
      if (existing_node.primary_ip == primary_ip or
1690
          existing_node.secondary_ip == primary_ip or
1691
          existing_node.primary_ip == secondary_ip or
1692
          existing_node.secondary_ip == secondary_ip):
1693
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1694
                                   " existing node %s" % existing_node.name)
1695

    
1696
    # check that the type of the node (single versus dual homed) is the
1697
    # same as for the master
1698
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1699
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1700
    newbie_singlehomed = secondary_ip == primary_ip
1701
    if master_singlehomed != newbie_singlehomed:
1702
      if master_singlehomed:
1703
        raise errors.OpPrereqError("The master has no private ip but the"
1704
                                   " new node has one")
1705
      else:
1706
        raise errors.OpPrereqError("The master has a private ip but the"
1707
                                   " new node doesn't have one")
1708

    
1709
    # checks reachablity
1710
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1711
      raise errors.OpPrereqError("Node not reachable by ping")
1712

    
1713
    if not newbie_singlehomed:
1714
      # check reachability from my secondary ip to newbie's secondary ip
1715
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1716
                           source=myself.secondary_ip):
1717
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1718
                                   " based ping to noded port")
1719

    
1720
    self.new_node = objects.Node(name=node,
1721
                                 primary_ip=primary_ip,
1722
                                 secondary_ip=secondary_ip)
1723

    
1724
  def Exec(self, feedback_fn):
1725
    """Adds the new node to the cluster.
1726

1727
    """
1728
    new_node = self.new_node
1729
    node = new_node.name
1730

    
1731
    # check connectivity
1732
    result = rpc.call_version([node])[node]
1733
    if result:
1734
      if constants.PROTOCOL_VERSION == result:
1735
        logger.Info("communication to node %s fine, sw version %s match" %
1736
                    (node, result))
1737
      else:
1738
        raise errors.OpExecError("Version mismatch master version %s,"
1739
                                 " node version %s" %
1740
                                 (constants.PROTOCOL_VERSION, result))
1741
    else:
1742
      raise errors.OpExecError("Cannot get version from the new node")
1743

    
1744
    # setup ssh on node
1745
    logger.Info("copy ssh key to node %s" % node)
1746
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1747
    keyarray = []
1748
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1749
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1750
                priv_key, pub_key]
1751

    
1752
    for i in keyfiles:
1753
      f = open(i, 'r')
1754
      try:
1755
        keyarray.append(f.read())
1756
      finally:
1757
        f.close()
1758

    
1759
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1760
                               keyarray[3], keyarray[4], keyarray[5])
1761

    
1762
    if not result:
1763
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1764

    
1765
    # Add node to our /etc/hosts, and add key to known_hosts
1766
    utils.AddHostToEtcHosts(new_node.name)
1767

    
1768
    if new_node.secondary_ip != new_node.primary_ip:
1769
      if not rpc.call_node_tcp_ping(new_node.name,
1770
                                    constants.LOCALHOST_IP_ADDRESS,
1771
                                    new_node.secondary_ip,
1772
                                    constants.DEFAULT_NODED_PORT,
1773
                                    10, False):
1774
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1775
                                 " you gave (%s). Please fix and re-run this"
1776
                                 " command." % new_node.secondary_ip)
1777

    
1778
    node_verify_list = [self.sstore.GetMasterNode()]
1779
    node_verify_param = {
1780
      'nodelist': [node],
1781
      # TODO: do a node-net-test as well?
1782
    }
1783

    
1784
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1785
    for verifier in node_verify_list:
1786
      if not result[verifier]:
1787
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1788
                                 " for remote verification" % verifier)
1789
      if result[verifier]['nodelist']:
1790
        for failed in result[verifier]['nodelist']:
1791
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1792
                      (verifier, result[verifier]['nodelist'][failed]))
1793
        raise errors.OpExecError("ssh/hostname verification failed.")
1794

    
1795
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1796
    # including the node just added
1797
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1798
    dist_nodes = self.cfg.GetNodeList()
1799
    if not self.op.readd:
1800
      dist_nodes.append(node)
1801
    if myself.name in dist_nodes:
1802
      dist_nodes.remove(myself.name)
1803

    
1804
    logger.Debug("Copying hosts and known_hosts to all nodes")
1805
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1806
      result = rpc.call_upload_file(dist_nodes, fname)
1807
      for to_node in dist_nodes:
1808
        if not result[to_node]:
1809
          logger.Error("copy of file %s to node %s failed" %
1810
                       (fname, to_node))
1811

    
1812
    to_copy = self.sstore.GetFileList()
1813
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1814
      to_copy.append(constants.VNC_PASSWORD_FILE)
1815
    for fname in to_copy:
1816
      result = rpc.call_upload_file([node], fname)
1817
      if not result[node]:
1818
        logger.Error("could not copy file %s to node %s" % (fname, node))
1819

    
1820
    if self.op.readd:
1821
      self.context.ReaddNode(new_node)
1822
    else:
1823
      self.context.AddNode(new_node)
1824

    
1825

    
1826
class LUQueryClusterInfo(NoHooksLU):
1827
  """Query cluster configuration.
1828

1829
  """
1830
  _OP_REQP = []
1831
  REQ_MASTER = False
1832
  REQ_BGL = False
1833

    
1834
  def ExpandNames(self):
1835
    self.needed_locks = {}
1836

    
1837
  def CheckPrereq(self):
1838
    """No prerequsites needed for this LU.
1839

1840
    """
1841
    pass
1842

    
1843
  def Exec(self, feedback_fn):
1844
    """Return cluster config.
1845

1846
    """
1847
    result = {
1848
      "name": self.sstore.GetClusterName(),
1849
      "software_version": constants.RELEASE_VERSION,
1850
      "protocol_version": constants.PROTOCOL_VERSION,
1851
      "config_version": constants.CONFIG_VERSION,
1852
      "os_api_version": constants.OS_API_VERSION,
1853
      "export_version": constants.EXPORT_VERSION,
1854
      "master": self.sstore.GetMasterNode(),
1855
      "architecture": (platform.architecture()[0], platform.machine()),
1856
      "hypervisor_type": self.sstore.GetHypervisorType(),
1857
      }
1858

    
1859
    return result
1860

    
1861

    
1862
class LUDumpClusterConfig(NoHooksLU):
1863
  """Return a text-representation of the cluster-config.
1864

1865
  """
1866
  _OP_REQP = []
1867
  REQ_BGL = False
1868

    
1869
  def ExpandNames(self):
1870
    self.needed_locks = {}
1871

    
1872
  def CheckPrereq(self):
1873
    """No prerequisites.
1874

1875
    """
1876
    pass
1877

    
1878
  def Exec(self, feedback_fn):
1879
    """Dump a representation of the cluster config to the standard output.
1880

1881
    """
1882
    return self.cfg.DumpConfig()
1883

    
1884

    
1885
class LUActivateInstanceDisks(NoHooksLU):
1886
  """Bring up an instance's disks.
1887

1888
  """
1889
  _OP_REQP = ["instance_name"]
1890
  REQ_BGL = False
1891

    
1892
  def ExpandNames(self):
1893
    self._ExpandAndLockInstance()
1894
    self.needed_locks[locking.LEVEL_NODE] = []
1895
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1896

    
1897
  def DeclareLocks(self, level):
1898
    if level == locking.LEVEL_NODE:
1899
      self._LockInstancesNodes()
1900

    
1901
  def CheckPrereq(self):
1902
    """Check prerequisites.
1903

1904
    This checks that the instance is in the cluster.
1905

1906
    """
1907
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1908
    assert self.instance is not None, \
1909
      "Cannot retrieve locked instance %s" % self.op.instance_name
1910

    
1911
  def Exec(self, feedback_fn):
1912
    """Activate the disks.
1913

1914
    """
1915
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1916
    if not disks_ok:
1917
      raise errors.OpExecError("Cannot activate block devices")
1918

    
1919
    return disks_info
1920

    
1921

    
1922
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1923
  """Prepare the block devices for an instance.
1924

1925
  This sets up the block devices on all nodes.
1926

1927
  Args:
1928
    instance: a ganeti.objects.Instance object
1929
    ignore_secondaries: if true, errors on secondary nodes won't result
1930
                        in an error return from the function
1931

1932
  Returns:
1933
    false if the operation failed
1934
    list of (host, instance_visible_name, node_visible_name) if the operation
1935
         suceeded with the mapping from node devices to instance devices
1936
  """
1937
  device_info = []
1938
  disks_ok = True
1939
  iname = instance.name
1940
  # With the two passes mechanism we try to reduce the window of
1941
  # opportunity for the race condition of switching DRBD to primary
1942
  # before handshaking occured, but we do not eliminate it
1943

    
1944
  # The proper fix would be to wait (with some limits) until the
1945
  # connection has been made and drbd transitions from WFConnection
1946
  # into any other network-connected state (Connected, SyncTarget,
1947
  # SyncSource, etc.)
1948

    
1949
  # 1st pass, assemble on all nodes in secondary mode
1950
  for inst_disk in instance.disks:
1951
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1952
      cfg.SetDiskID(node_disk, node)
1953
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1954
      if not result:
1955
        logger.Error("could not prepare block device %s on node %s"
1956
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1957
        if not ignore_secondaries:
1958
          disks_ok = False
1959

    
1960
  # FIXME: race condition on drbd migration to primary
1961

    
1962
  # 2nd pass, do only the primary node
1963
  for inst_disk in instance.disks:
1964
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1965
      if node != instance.primary_node:
1966
        continue
1967
      cfg.SetDiskID(node_disk, node)
1968
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1969
      if not result:
1970
        logger.Error("could not prepare block device %s on node %s"
1971
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1972
        disks_ok = False
1973
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1974

    
1975
  # leave the disks configured for the primary node
1976
  # this is a workaround that would be fixed better by
1977
  # improving the logical/physical id handling
1978
  for disk in instance.disks:
1979
    cfg.SetDiskID(disk, instance.primary_node)
1980

    
1981
  return disks_ok, device_info
1982

    
1983

    
1984
def _StartInstanceDisks(cfg, instance, force):
1985
  """Start the disks of an instance.
1986

1987
  """
1988
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1989
                                           ignore_secondaries=force)
1990
  if not disks_ok:
1991
    _ShutdownInstanceDisks(instance, cfg)
1992
    if force is not None and not force:
1993
      logger.Error("If the message above refers to a secondary node,"
1994
                   " you can retry the operation using '--force'.")
1995
    raise errors.OpExecError("Disk consistency error")
1996

    
1997

    
1998
class LUDeactivateInstanceDisks(NoHooksLU):
1999
  """Shutdown an instance's disks.
2000

2001
  """
2002
  _OP_REQP = ["instance_name"]
2003
  REQ_BGL = False
2004

    
2005
  def ExpandNames(self):
2006
    self._ExpandAndLockInstance()
2007
    self.needed_locks[locking.LEVEL_NODE] = []
2008
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2009

    
2010
  def DeclareLocks(self, level):
2011
    if level == locking.LEVEL_NODE:
2012
      self._LockInstancesNodes()
2013

    
2014
  def CheckPrereq(self):
2015
    """Check prerequisites.
2016

2017
    This checks that the instance is in the cluster.
2018

2019
    """
2020
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2021
    assert self.instance is not None, \
2022
      "Cannot retrieve locked instance %s" % self.op.instance_name
2023

    
2024
  def Exec(self, feedback_fn):
2025
    """Deactivate the disks
2026

2027
    """
2028
    instance = self.instance
2029
    _SafeShutdownInstanceDisks(instance, self.cfg)
2030

    
2031

    
2032
def _SafeShutdownInstanceDisks(instance, cfg):
2033
  """Shutdown block devices of an instance.
2034

2035
  This function checks if an instance is running, before calling
2036
  _ShutdownInstanceDisks.
2037

2038
  """
2039
  ins_l = rpc.call_instance_list([instance.primary_node])
2040
  ins_l = ins_l[instance.primary_node]
2041
  if not type(ins_l) is list:
2042
    raise errors.OpExecError("Can't contact node '%s'" %
2043
                             instance.primary_node)
2044

    
2045
  if instance.name in ins_l:
2046
    raise errors.OpExecError("Instance is running, can't shutdown"
2047
                             " block devices.")
2048

    
2049
  _ShutdownInstanceDisks(instance, cfg)
2050

    
2051

    
2052
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2053
  """Shutdown block devices of an instance.
2054

2055
  This does the shutdown on all nodes of the instance.
2056

2057
  If the ignore_primary is false, errors on the primary node are
2058
  ignored.
2059

2060
  """
2061
  result = True
2062
  for disk in instance.disks:
2063
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2064
      cfg.SetDiskID(top_disk, node)
2065
      if not rpc.call_blockdev_shutdown(node, top_disk):
2066
        logger.Error("could not shutdown block device %s on node %s" %
2067
                     (disk.iv_name, node))
2068
        if not ignore_primary or node != instance.primary_node:
2069
          result = False
2070
  return result
2071

    
2072

    
2073
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2074
  """Checks if a node has enough free memory.
2075

2076
  This function check if a given node has the needed amount of free
2077
  memory. In case the node has less memory or we cannot get the
2078
  information from the node, this function raise an OpPrereqError
2079
  exception.
2080

2081
  Args:
2082
    - cfg: a ConfigWriter instance
2083
    - node: the node name
2084
    - reason: string to use in the error message
2085
    - requested: the amount of memory in MiB
2086

2087
  """
2088
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2089
  if not nodeinfo or not isinstance(nodeinfo, dict):
2090
    raise errors.OpPrereqError("Could not contact node %s for resource"
2091
                             " information" % (node,))
2092

    
2093
  free_mem = nodeinfo[node].get('memory_free')
2094
  if not isinstance(free_mem, int):
2095
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2096
                             " was '%s'" % (node, free_mem))
2097
  if requested > free_mem:
2098
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2099
                             " needed %s MiB, available %s MiB" %
2100
                             (node, reason, requested, free_mem))
2101

    
2102

    
2103
class LUStartupInstance(LogicalUnit):
2104
  """Starts an instance.
2105

2106
  """
2107
  HPATH = "instance-start"
2108
  HTYPE = constants.HTYPE_INSTANCE
2109
  _OP_REQP = ["instance_name", "force"]
2110
  REQ_BGL = False
2111

    
2112
  def ExpandNames(self):
2113
    self._ExpandAndLockInstance()
2114
    self.needed_locks[locking.LEVEL_NODE] = []
2115
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2116

    
2117
  def DeclareLocks(self, level):
2118
    if level == locking.LEVEL_NODE:
2119
      self._LockInstancesNodes()
2120

    
2121
  def BuildHooksEnv(self):
2122
    """Build hooks env.
2123

2124
    This runs on master, primary and secondary nodes of the instance.
2125

2126
    """
2127
    env = {
2128
      "FORCE": self.op.force,
2129
      }
2130
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2131
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2132
          list(self.instance.secondary_nodes))
2133
    return env, nl, nl
2134

    
2135
  def CheckPrereq(self):
2136
    """Check prerequisites.
2137

2138
    This checks that the instance is in the cluster.
2139

2140
    """
2141
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2142
    assert self.instance is not None, \
2143
      "Cannot retrieve locked instance %s" % self.op.instance_name
2144

    
2145
    # check bridges existance
2146
    _CheckInstanceBridgesExist(instance)
2147

    
2148
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2149
                         "starting instance %s" % instance.name,
2150
                         instance.memory)
2151

    
2152
  def Exec(self, feedback_fn):
2153
    """Start the instance.
2154

2155
    """
2156
    instance = self.instance
2157
    force = self.op.force
2158
    extra_args = getattr(self.op, "extra_args", "")
2159

    
2160
    self.cfg.MarkInstanceUp(instance.name)
2161

    
2162
    node_current = instance.primary_node
2163

    
2164
    _StartInstanceDisks(self.cfg, instance, force)
2165

    
2166
    if not rpc.call_instance_start(node_current, instance, extra_args):
2167
      _ShutdownInstanceDisks(instance, self.cfg)
2168
      raise errors.OpExecError("Could not start instance")
2169

    
2170

    
2171
class LURebootInstance(LogicalUnit):
2172
  """Reboot an instance.
2173

2174
  """
2175
  HPATH = "instance-reboot"
2176
  HTYPE = constants.HTYPE_INSTANCE
2177
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2178
  REQ_BGL = False
2179

    
2180
  def ExpandNames(self):
2181
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2182
                                   constants.INSTANCE_REBOOT_HARD,
2183
                                   constants.INSTANCE_REBOOT_FULL]:
2184
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2185
                                  (constants.INSTANCE_REBOOT_SOFT,
2186
                                   constants.INSTANCE_REBOOT_HARD,
2187
                                   constants.INSTANCE_REBOOT_FULL))
2188
    self._ExpandAndLockInstance()
2189
    self.needed_locks[locking.LEVEL_NODE] = []
2190
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2191

    
2192
  def DeclareLocks(self, level):
2193
    if level == locking.LEVEL_NODE:
2194
      primary_only = not constants.INSTANCE_REBOOT_FULL
2195
      self._LockInstancesNodes(primary_only=primary_only)
2196

    
2197
  def BuildHooksEnv(self):
2198
    """Build hooks env.
2199

2200
    This runs on master, primary and secondary nodes of the instance.
2201

2202
    """
2203
    env = {
2204
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2205
      }
2206
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2207
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2208
          list(self.instance.secondary_nodes))
2209
    return env, nl, nl
2210

    
2211
  def CheckPrereq(self):
2212
    """Check prerequisites.
2213

2214
    This checks that the instance is in the cluster.
2215

2216
    """
2217
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2218
    assert self.instance is not None, \
2219
      "Cannot retrieve locked instance %s" % self.op.instance_name
2220

    
2221
    # check bridges existance
2222
    _CheckInstanceBridgesExist(instance)
2223

    
2224
  def Exec(self, feedback_fn):
2225
    """Reboot the instance.
2226

2227
    """
2228
    instance = self.instance
2229
    ignore_secondaries = self.op.ignore_secondaries
2230
    reboot_type = self.op.reboot_type
2231
    extra_args = getattr(self.op, "extra_args", "")
2232

    
2233
    node_current = instance.primary_node
2234

    
2235
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2236
                       constants.INSTANCE_REBOOT_HARD]:
2237
      if not rpc.call_instance_reboot(node_current, instance,
2238
                                      reboot_type, extra_args):
2239
        raise errors.OpExecError("Could not reboot instance")
2240
    else:
2241
      if not rpc.call_instance_shutdown(node_current, instance):
2242
        raise errors.OpExecError("could not shutdown instance for full reboot")
2243
      _ShutdownInstanceDisks(instance, self.cfg)
2244
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2245
      if not rpc.call_instance_start(node_current, instance, extra_args):
2246
        _ShutdownInstanceDisks(instance, self.cfg)
2247
        raise errors.OpExecError("Could not start instance for full reboot")
2248

    
2249
    self.cfg.MarkInstanceUp(instance.name)
2250

    
2251

    
2252
class LUShutdownInstance(LogicalUnit):
2253
  """Shutdown an instance.
2254

2255
  """
2256
  HPATH = "instance-stop"
2257
  HTYPE = constants.HTYPE_INSTANCE
2258
  _OP_REQP = ["instance_name"]
2259
  REQ_BGL = False
2260

    
2261
  def ExpandNames(self):
2262
    self._ExpandAndLockInstance()
2263
    self.needed_locks[locking.LEVEL_NODE] = []
2264
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2265

    
2266
  def DeclareLocks(self, level):
2267
    if level == locking.LEVEL_NODE:
2268
      self._LockInstancesNodes()
2269

    
2270
  def BuildHooksEnv(self):
2271
    """Build hooks env.
2272

2273
    This runs on master, primary and secondary nodes of the instance.
2274

2275
    """
2276
    env = _BuildInstanceHookEnvByObject(self.instance)
2277
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2278
          list(self.instance.secondary_nodes))
2279
    return env, nl, nl
2280

    
2281
  def CheckPrereq(self):
2282
    """Check prerequisites.
2283

2284
    This checks that the instance is in the cluster.
2285

2286
    """
2287
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2288
    assert self.instance is not None, \
2289
      "Cannot retrieve locked instance %s" % self.op.instance_name
2290

    
2291
  def Exec(self, feedback_fn):
2292
    """Shutdown the instance.
2293

2294
    """
2295
    instance = self.instance
2296
    node_current = instance.primary_node
2297
    self.cfg.MarkInstanceDown(instance.name)
2298
    if not rpc.call_instance_shutdown(node_current, instance):
2299
      logger.Error("could not shutdown instance")
2300

    
2301
    _ShutdownInstanceDisks(instance, self.cfg)
2302

    
2303

    
2304
class LUReinstallInstance(LogicalUnit):
2305
  """Reinstall an instance.
2306

2307
  """
2308
  HPATH = "instance-reinstall"
2309
  HTYPE = constants.HTYPE_INSTANCE
2310
  _OP_REQP = ["instance_name"]
2311
  REQ_BGL = False
2312

    
2313
  def ExpandNames(self):
2314
    self._ExpandAndLockInstance()
2315
    self.needed_locks[locking.LEVEL_NODE] = []
2316
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2317

    
2318
  def DeclareLocks(self, level):
2319
    if level == locking.LEVEL_NODE:
2320
      self._LockInstancesNodes()
2321

    
2322
  def BuildHooksEnv(self):
2323
    """Build hooks env.
2324

2325
    This runs on master, primary and secondary nodes of the instance.
2326

2327
    """
2328
    env = _BuildInstanceHookEnvByObject(self.instance)
2329
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2330
          list(self.instance.secondary_nodes))
2331
    return env, nl, nl
2332

    
2333
  def CheckPrereq(self):
2334
    """Check prerequisites.
2335

2336
    This checks that the instance is in the cluster and is not running.
2337

2338
    """
2339
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2340
    assert instance is not None, \
2341
      "Cannot retrieve locked instance %s" % self.op.instance_name
2342

    
2343
    if instance.disk_template == constants.DT_DISKLESS:
2344
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2345
                                 self.op.instance_name)
2346
    if instance.status != "down":
2347
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2348
                                 self.op.instance_name)
2349
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2350
    if remote_info:
2351
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2352
                                 (self.op.instance_name,
2353
                                  instance.primary_node))
2354

    
2355
    self.op.os_type = getattr(self.op, "os_type", None)
2356
    if self.op.os_type is not None:
2357
      # OS verification
2358
      pnode = self.cfg.GetNodeInfo(
2359
        self.cfg.ExpandNodeName(instance.primary_node))
2360
      if pnode is None:
2361
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2362
                                   self.op.pnode)
2363
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2364
      if not os_obj:
2365
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2366
                                   " primary node"  % self.op.os_type)
2367

    
2368
    self.instance = instance
2369

    
2370
  def Exec(self, feedback_fn):
2371
    """Reinstall the instance.
2372

2373
    """
2374
    inst = self.instance
2375

    
2376
    if self.op.os_type is not None:
2377
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2378
      inst.os = self.op.os_type
2379
      self.cfg.Update(inst)
2380

    
2381
    _StartInstanceDisks(self.cfg, inst, None)
2382
    try:
2383
      feedback_fn("Running the instance OS create scripts...")
2384
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2385
        raise errors.OpExecError("Could not install OS for instance %s"
2386
                                 " on node %s" %
2387
                                 (inst.name, inst.primary_node))
2388
    finally:
2389
      _ShutdownInstanceDisks(inst, self.cfg)
2390

    
2391

    
2392
class LURenameInstance(LogicalUnit):
2393
  """Rename an instance.
2394

2395
  """
2396
  HPATH = "instance-rename"
2397
  HTYPE = constants.HTYPE_INSTANCE
2398
  _OP_REQP = ["instance_name", "new_name"]
2399

    
2400
  def BuildHooksEnv(self):
2401
    """Build hooks env.
2402

2403
    This runs on master, primary and secondary nodes of the instance.
2404

2405
    """
2406
    env = _BuildInstanceHookEnvByObject(self.instance)
2407
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2408
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2409
          list(self.instance.secondary_nodes))
2410
    return env, nl, nl
2411

    
2412
  def CheckPrereq(self):
2413
    """Check prerequisites.
2414

2415
    This checks that the instance is in the cluster and is not running.
2416

2417
    """
2418
    instance = self.cfg.GetInstanceInfo(
2419
      self.cfg.ExpandInstanceName(self.op.instance_name))
2420
    if instance is None:
2421
      raise errors.OpPrereqError("Instance '%s' not known" %
2422
                                 self.op.instance_name)
2423
    if instance.status != "down":
2424
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2425
                                 self.op.instance_name)
2426
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2427
    if remote_info:
2428
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2429
                                 (self.op.instance_name,
2430
                                  instance.primary_node))
2431
    self.instance = instance
2432

    
2433
    # new name verification
2434
    name_info = utils.HostInfo(self.op.new_name)
2435

    
2436
    self.op.new_name = new_name = name_info.name
2437
    instance_list = self.cfg.GetInstanceList()
2438
    if new_name in instance_list:
2439
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2440
                                 new_name)
2441

    
2442
    if not getattr(self.op, "ignore_ip", False):
2443
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2444
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2445
                                   (name_info.ip, new_name))
2446

    
2447

    
2448
  def Exec(self, feedback_fn):
2449
    """Reinstall the instance.
2450

2451
    """
2452
    inst = self.instance
2453
    old_name = inst.name
2454

    
2455
    if inst.disk_template == constants.DT_FILE:
2456
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2457

    
2458
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2459
    # Change the instance lock. This is definitely safe while we hold the BGL
2460
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2461
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2462

    
2463
    # re-read the instance from the configuration after rename
2464
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2465

    
2466
    if inst.disk_template == constants.DT_FILE:
2467
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2468
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2469
                                                old_file_storage_dir,
2470
                                                new_file_storage_dir)
2471

    
2472
      if not result:
2473
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2474
                                 " directory '%s' to '%s' (but the instance"
2475
                                 " has been renamed in Ganeti)" % (
2476
                                 inst.primary_node, old_file_storage_dir,
2477
                                 new_file_storage_dir))
2478

    
2479
      if not result[0]:
2480
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2481
                                 " (but the instance has been renamed in"
2482
                                 " Ganeti)" % (old_file_storage_dir,
2483
                                               new_file_storage_dir))
2484

    
2485
    _StartInstanceDisks(self.cfg, inst, None)
2486
    try:
2487
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2488
                                          "sda", "sdb"):
2489
        msg = ("Could not run OS rename script for instance %s on node %s"
2490
               " (but the instance has been renamed in Ganeti)" %
2491
               (inst.name, inst.primary_node))
2492
        logger.Error(msg)
2493
    finally:
2494
      _ShutdownInstanceDisks(inst, self.cfg)
2495

    
2496

    
2497
class LURemoveInstance(LogicalUnit):
2498
  """Remove an instance.
2499

2500
  """
2501
  HPATH = "instance-remove"
2502
  HTYPE = constants.HTYPE_INSTANCE
2503
  _OP_REQP = ["instance_name", "ignore_failures"]
2504
  REQ_BGL = False
2505

    
2506
  def ExpandNames(self):
2507
    self._ExpandAndLockInstance()
2508
    self.needed_locks[locking.LEVEL_NODE] = []
2509
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2510

    
2511
  def DeclareLocks(self, level):
2512
    if level == locking.LEVEL_NODE:
2513
      self._LockInstancesNodes()
2514

    
2515
  def BuildHooksEnv(self):
2516
    """Build hooks env.
2517

2518
    This runs on master, primary and secondary nodes of the instance.
2519

2520
    """
2521
    env = _BuildInstanceHookEnvByObject(self.instance)
2522
    nl = [self.sstore.GetMasterNode()]
2523
    return env, nl, nl
2524

    
2525
  def CheckPrereq(self):
2526
    """Check prerequisites.
2527

2528
    This checks that the instance is in the cluster.
2529

2530
    """
2531
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2532
    assert self.instance is not None, \
2533
      "Cannot retrieve locked instance %s" % self.op.instance_name
2534

    
2535
  def Exec(self, feedback_fn):
2536
    """Remove the instance.
2537

2538
    """
2539
    instance = self.instance
2540
    logger.Info("shutting down instance %s on node %s" %
2541
                (instance.name, instance.primary_node))
2542

    
2543
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2544
      if self.op.ignore_failures:
2545
        feedback_fn("Warning: can't shutdown instance")
2546
      else:
2547
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2548
                                 (instance.name, instance.primary_node))
2549

    
2550
    logger.Info("removing block devices for instance %s" % instance.name)
2551

    
2552
    if not _RemoveDisks(instance, self.cfg):
2553
      if self.op.ignore_failures:
2554
        feedback_fn("Warning: can't remove instance's disks")
2555
      else:
2556
        raise errors.OpExecError("Can't remove instance's disks")
2557

    
2558
    logger.Info("removing instance %s out of cluster config" % instance.name)
2559

    
2560
    self.cfg.RemoveInstance(instance.name)
2561
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2562

    
2563

    
2564
class LUQueryInstances(NoHooksLU):
2565
  """Logical unit for querying instances.
2566

2567
  """
2568
  _OP_REQP = ["output_fields", "names"]
2569
  REQ_BGL = False
2570

    
2571
  def ExpandNames(self):
2572
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2573
    self.static_fields = frozenset([
2574
      "name", "os", "pnode", "snodes",
2575
      "admin_state", "admin_ram",
2576
      "disk_template", "ip", "mac", "bridge",
2577
      "sda_size", "sdb_size", "vcpus", "tags",
2578
      "network_port", "kernel_path", "initrd_path",
2579
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2580
      "hvm_cdrom_image_path", "hvm_nic_type",
2581
      "hvm_disk_type", "vnc_bind_address",
2582
      "serial_no",
2583
      ])
2584
    _CheckOutputFields(static=self.static_fields,
2585
                       dynamic=self.dynamic_fields,
2586
                       selected=self.op.output_fields)
2587

    
2588
    self.needed_locks = {}
2589
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2590
    self.share_locks[locking.LEVEL_NODE] = 1
2591

    
2592
    if self.op.names:
2593
      self.wanted = _GetWantedInstances(self, self.op.names)
2594
    else:
2595
      self.wanted = locking.ALL_SET
2596

    
2597
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2598
    if self.do_locking:
2599
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2600
      self.needed_locks[locking.LEVEL_NODE] = []
2601
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2602

    
2603
  def DeclareLocks(self, level):
2604
    if level == locking.LEVEL_NODE and self.do_locking:
2605
      self._LockInstancesNodes()
2606

    
2607
  def CheckPrereq(self):
2608
    """Check prerequisites.
2609

2610
    """
2611
    pass
2612

    
2613
  def Exec(self, feedback_fn):
2614
    """Computes the list of nodes and their attributes.
2615

2616
    """
2617
    all_info = self.cfg.GetAllInstancesInfo()
2618
    if self.do_locking:
2619
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2620
    elif self.wanted != locking.ALL_SET:
2621
      instance_names = self.wanted
2622
      missing = set(instance_names).difference(all_info.keys())
2623
      if missing:
2624
        raise self.OpExecError(
2625
          "Some instances were removed before retrieving their data: %s"
2626
          % missing)
2627
    else:
2628
      instance_names = all_info.keys()
2629
    instance_list = [all_info[iname] for iname in instance_names]
2630

    
2631
    # begin data gathering
2632

    
2633
    nodes = frozenset([inst.primary_node for inst in instance_list])
2634

    
2635
    bad_nodes = []
2636
    if self.dynamic_fields.intersection(self.op.output_fields):
2637
      live_data = {}
2638
      node_data = rpc.call_all_instances_info(nodes)
2639
      for name in nodes:
2640
        result = node_data[name]
2641
        if result:
2642
          live_data.update(result)
2643
        elif result == False:
2644
          bad_nodes.append(name)
2645
        # else no instance is alive
2646
    else:
2647
      live_data = dict([(name, {}) for name in instance_names])
2648

    
2649
    # end data gathering
2650

    
2651
    output = []
2652
    for instance in instance_list:
2653
      iout = []
2654
      for field in self.op.output_fields:
2655
        if field == "name":
2656
          val = instance.name
2657
        elif field == "os":
2658
          val = instance.os
2659
        elif field == "pnode":
2660
          val = instance.primary_node
2661
        elif field == "snodes":
2662
          val = list(instance.secondary_nodes)
2663
        elif field == "admin_state":
2664
          val = (instance.status != "down")
2665
        elif field == "oper_state":
2666
          if instance.primary_node in bad_nodes:
2667
            val = None
2668
          else:
2669
            val = bool(live_data.get(instance.name))
2670
        elif field == "status":
2671
          if instance.primary_node in bad_nodes:
2672
            val = "ERROR_nodedown"
2673
          else:
2674
            running = bool(live_data.get(instance.name))
2675
            if running:
2676
              if instance.status != "down":
2677
                val = "running"
2678
              else:
2679
                val = "ERROR_up"
2680
            else:
2681
              if instance.status != "down":
2682
                val = "ERROR_down"
2683
              else:
2684
                val = "ADMIN_down"
2685
        elif field == "admin_ram":
2686
          val = instance.memory
2687
        elif field == "oper_ram":
2688
          if instance.primary_node in bad_nodes:
2689
            val = None
2690
          elif instance.name in live_data:
2691
            val = live_data[instance.name].get("memory", "?")
2692
          else:
2693
            val = "-"
2694
        elif field == "disk_template":
2695
          val = instance.disk_template
2696
        elif field == "ip":
2697
          val = instance.nics[0].ip
2698
        elif field == "bridge":
2699
          val = instance.nics[0].bridge
2700
        elif field == "mac":
2701
          val = instance.nics[0].mac
2702
        elif field == "sda_size" or field == "sdb_size":
2703
          disk = instance.FindDisk(field[:3])
2704
          if disk is None:
2705
            val = None
2706
          else:
2707
            val = disk.size
2708
        elif field == "vcpus":
2709
          val = instance.vcpus
2710
        elif field == "tags":
2711
          val = list(instance.GetTags())
2712
        elif field == "serial_no":
2713
          val = instance.serial_no
2714
        elif field in ("network_port", "kernel_path", "initrd_path",
2715
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2716
                       "hvm_cdrom_image_path", "hvm_nic_type",
2717
                       "hvm_disk_type", "vnc_bind_address"):
2718
          val = getattr(instance, field, None)
2719
          if val is not None:
2720
            pass
2721
          elif field in ("hvm_nic_type", "hvm_disk_type",
2722
                         "kernel_path", "initrd_path"):
2723
            val = "default"
2724
          else:
2725
            val = "-"
2726
        else:
2727
          raise errors.ParameterError(field)
2728
        iout.append(val)
2729
      output.append(iout)
2730

    
2731
    return output
2732

    
2733

    
2734
class LUFailoverInstance(LogicalUnit):
2735
  """Failover an instance.
2736

2737
  """
2738
  HPATH = "instance-failover"
2739
  HTYPE = constants.HTYPE_INSTANCE
2740
  _OP_REQP = ["instance_name", "ignore_consistency"]
2741
  REQ_BGL = False
2742

    
2743
  def ExpandNames(self):
2744
    self._ExpandAndLockInstance()
2745
    self.needed_locks[locking.LEVEL_NODE] = []
2746
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747

    
2748
  def DeclareLocks(self, level):
2749
    if level == locking.LEVEL_NODE:
2750
      self._LockInstancesNodes()
2751

    
2752
  def BuildHooksEnv(self):
2753
    """Build hooks env.
2754

2755
    This runs on master, primary and secondary nodes of the instance.
2756

2757
    """
2758
    env = {
2759
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2760
      }
2761
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2762
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2763
    return env, nl, nl
2764

    
2765
  def CheckPrereq(self):
2766
    """Check prerequisites.
2767

2768
    This checks that the instance is in the cluster.
2769

2770
    """
2771
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2772
    assert self.instance is not None, \
2773
      "Cannot retrieve locked instance %s" % self.op.instance_name
2774

    
2775
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2776
      raise errors.OpPrereqError("Instance's disk layout is not"
2777
                                 " network mirrored, cannot failover.")
2778

    
2779
    secondary_nodes = instance.secondary_nodes
2780
    if not secondary_nodes:
2781
      raise errors.ProgrammerError("no secondary node but using "
2782
                                   "a mirrored disk template")
2783

    
2784
    target_node = secondary_nodes[0]
2785
    # check memory requirements on the secondary node
2786
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2787
                         instance.name, instance.memory)
2788

    
2789
    # check bridge existance
2790
    brlist = [nic.bridge for nic in instance.nics]
2791
    if not rpc.call_bridges_exist(target_node, brlist):
2792
      raise errors.OpPrereqError("One or more target bridges %s does not"
2793
                                 " exist on destination node '%s'" %
2794
                                 (brlist, target_node))
2795

    
2796
  def Exec(self, feedback_fn):
2797
    """Failover an instance.
2798

2799
    The failover is done by shutting it down on its present node and
2800
    starting it on the secondary.
2801

2802
    """
2803
    instance = self.instance
2804

    
2805
    source_node = instance.primary_node
2806
    target_node = instance.secondary_nodes[0]
2807

    
2808
    feedback_fn("* checking disk consistency between source and target")
2809
    for dev in instance.disks:
2810
      # for drbd, these are drbd over lvm
2811
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2812
        if instance.status == "up" and not self.op.ignore_consistency:
2813
          raise errors.OpExecError("Disk %s is degraded on target node,"
2814
                                   " aborting failover." % dev.iv_name)
2815

    
2816
    feedback_fn("* shutting down instance on source node")
2817
    logger.Info("Shutting down instance %s on node %s" %
2818
                (instance.name, source_node))
2819

    
2820
    if not rpc.call_instance_shutdown(source_node, instance):
2821
      if self.op.ignore_consistency:
2822
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2823
                     " anyway. Please make sure node %s is down"  %
2824
                     (instance.name, source_node, source_node))
2825
      else:
2826
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2827
                                 (instance.name, source_node))
2828

    
2829
    feedback_fn("* deactivating the instance's disks on source node")
2830
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2831
      raise errors.OpExecError("Can't shut down the instance's disks.")
2832

    
2833
    instance.primary_node = target_node
2834
    # distribute new instance config to the other nodes
2835
    self.cfg.Update(instance)
2836

    
2837
    # Only start the instance if it's marked as up
2838
    if instance.status == "up":
2839
      feedback_fn("* activating the instance's disks on target node")
2840
      logger.Info("Starting instance %s on node %s" %
2841
                  (instance.name, target_node))
2842

    
2843
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2844
                                               ignore_secondaries=True)
2845
      if not disks_ok:
2846
        _ShutdownInstanceDisks(instance, self.cfg)
2847
        raise errors.OpExecError("Can't activate the instance's disks")
2848

    
2849
      feedback_fn("* starting the instance on the target node")
2850
      if not rpc.call_instance_start(target_node, instance, None):
2851
        _ShutdownInstanceDisks(instance, self.cfg)
2852
        raise errors.OpExecError("Could not start instance %s on node %s." %
2853
                                 (instance.name, target_node))
2854

    
2855

    
2856
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2857
  """Create a tree of block devices on the primary node.
2858

2859
  This always creates all devices.
2860

2861
  """
2862
  if device.children:
2863
    for child in device.children:
2864
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2865
        return False
2866

    
2867
  cfg.SetDiskID(device, node)
2868
  new_id = rpc.call_blockdev_create(node, device, device.size,
2869
                                    instance.name, True, info)
2870
  if not new_id:
2871
    return False
2872
  if device.physical_id is None:
2873
    device.physical_id = new_id
2874
  return True
2875

    
2876

    
2877
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2878
  """Create a tree of block devices on a secondary node.
2879

2880
  If this device type has to be created on secondaries, create it and
2881
  all its children.
2882

2883
  If not, just recurse to children keeping the same 'force' value.
2884

2885
  """
2886
  if device.CreateOnSecondary():
2887
    force = True
2888
  if device.children:
2889
    for child in device.children:
2890
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2891
                                        child, force, info):
2892
        return False
2893

    
2894
  if not force:
2895
    return True
2896
  cfg.SetDiskID(device, node)
2897
  new_id = rpc.call_blockdev_create(node, device, device.size,
2898
                                    instance.name, False, info)
2899
  if not new_id:
2900
    return False
2901
  if device.physical_id is None:
2902
    device.physical_id = new_id
2903
  return True
2904

    
2905

    
2906
def _GenerateUniqueNames(cfg, exts):
2907
  """Generate a suitable LV name.
2908

2909
  This will generate a logical volume name for the given instance.
2910

2911
  """
2912
  results = []
2913
  for val in exts:
2914
    new_id = cfg.GenerateUniqueID()
2915
    results.append("%s%s" % (new_id, val))
2916
  return results
2917

    
2918

    
2919
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2920
                         p_minor, s_minor):
2921
  """Generate a drbd8 device complete with its children.
2922

2923
  """
2924
  port = cfg.AllocatePort()
2925
  vgname = cfg.GetVGName()
2926
  shared_secret = cfg.GenerateDRBDSecret()
2927
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2928
                          logical_id=(vgname, names[0]))
2929
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2930
                          logical_id=(vgname, names[1]))
2931
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2932
                          logical_id=(primary, secondary, port,
2933
                                      p_minor, s_minor,
2934
                                      shared_secret),
2935
                          children=[dev_data, dev_meta],
2936
                          iv_name=iv_name)
2937
  return drbd_dev
2938

    
2939

    
2940
def _GenerateDiskTemplate(cfg, template_name,
2941
                          instance_name, primary_node,
2942
                          secondary_nodes, disk_sz, swap_sz,
2943
                          file_storage_dir, file_driver):
2944
  """Generate the entire disk layout for a given template type.
2945

2946
  """
2947
  #TODO: compute space requirements
2948

    
2949
  vgname = cfg.GetVGName()
2950
  if template_name == constants.DT_DISKLESS:
2951
    disks = []
2952
  elif template_name == constants.DT_PLAIN:
2953
    if len(secondary_nodes) != 0:
2954
      raise errors.ProgrammerError("Wrong template configuration")
2955

    
2956
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2957
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2958
                           logical_id=(vgname, names[0]),
2959
                           iv_name = "sda")
2960
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2961
                           logical_id=(vgname, names[1]),
2962
                           iv_name = "sdb")
2963
    disks = [sda_dev, sdb_dev]
2964
  elif template_name == constants.DT_DRBD8:
2965
    if len(secondary_nodes) != 1:
2966
      raise errors.ProgrammerError("Wrong template configuration")
2967
    remote_node = secondary_nodes[0]
2968
    (minor_pa, minor_pb,
2969
     minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2970
      [primary_node, primary_node, remote_node, remote_node], instance_name)
2971

    
2972
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2973
                                       ".sdb_data", ".sdb_meta"])
2974
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2975
                                        disk_sz, names[0:2], "sda",
2976
                                        minor_pa, minor_sa)
2977
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2978
                                        swap_sz, names[2:4], "sdb",
2979
                                        minor_pb, minor_sb)
2980
    disks = [drbd_sda_dev, drbd_sdb_dev]
2981
  elif template_name == constants.DT_FILE:
2982
    if len(secondary_nodes) != 0:
2983
      raise errors.ProgrammerError("Wrong template configuration")
2984

    
2985
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2986
                                iv_name="sda", logical_id=(file_driver,
2987
                                "%s/sda" % file_storage_dir))
2988
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2989
                                iv_name="sdb", logical_id=(file_driver,
2990
                                "%s/sdb" % file_storage_dir))
2991
    disks = [file_sda_dev, file_sdb_dev]
2992
  else:
2993
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2994
  return disks
2995

    
2996

    
2997
def _GetInstanceInfoText(instance):
2998
  """Compute that text that should be added to the disk's metadata.
2999

3000
  """
3001
  return "originstname+%s" % instance.name
3002

    
3003

    
3004
def _CreateDisks(cfg, instance):
3005
  """Create all disks for an instance.
3006

3007
  This abstracts away some work from AddInstance.
3008

3009
  Args:
3010
    instance: the instance object
3011

3012
  Returns:
3013
    True or False showing the success of the creation process
3014

3015
  """
3016
  info = _GetInstanceInfoText(instance)
3017

    
3018
  if instance.disk_template == constants.DT_FILE:
3019
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3020
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3021
                                              file_storage_dir)
3022

    
3023
    if not result:
3024
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3025
      return False
3026

    
3027
    if not result[0]:
3028
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3029
      return False
3030

    
3031
  for device in instance.disks:
3032
    logger.Info("creating volume %s for instance %s" %
3033
                (device.iv_name, instance.name))
3034
    #HARDCODE
3035
    for secondary_node in instance.secondary_nodes:
3036
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3037
                                        device, False, info):
3038
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3039
                     (device.iv_name, device, secondary_node))
3040
        return False
3041
    #HARDCODE
3042
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3043
                                    instance, device, info):
3044
      logger.Error("failed to create volume %s on primary!" %
3045
                   device.iv_name)
3046
      return False
3047

    
3048
  return True
3049

    
3050

    
3051
def _RemoveDisks(instance, cfg):
3052
  """Remove all disks for an instance.
3053

3054
  This abstracts away some work from `AddInstance()` and
3055
  `RemoveInstance()`. Note that in case some of the devices couldn't
3056
  be removed, the removal will continue with the other ones (compare
3057
  with `_CreateDisks()`).
3058

3059
  Args:
3060
    instance: the instance object
3061

3062
  Returns:
3063
    True or False showing the success of the removal proces
3064

3065
  """
3066
  logger.Info("removing block devices for instance %s" % instance.name)
3067

    
3068
  result = True
3069
  for device in instance.disks:
3070
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3071
      cfg.SetDiskID(disk, node)
3072
      if not rpc.call_blockdev_remove(node, disk):
3073
        logger.Error("could not remove block device %s on node %s,"
3074
                     " continuing anyway" %
3075
                     (device.iv_name, node))
3076
        result = False
3077

    
3078
  if instance.disk_template == constants.DT_FILE:
3079
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3080
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3081
                                            file_storage_dir):
3082
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3083
      result = False
3084

    
3085
  return result
3086

    
3087

    
3088
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3089
  """Compute disk size requirements in the volume group
3090

3091
  This is currently hard-coded for the two-drive layout.
3092

3093
  """
3094
  # Required free disk space as a function of disk and swap space
3095
  req_size_dict = {
3096
    constants.DT_DISKLESS: None,
3097
    constants.DT_PLAIN: disk_size + swap_size,
3098
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3099
    constants.DT_DRBD8: disk_size + swap_size + 256,
3100
    constants.DT_FILE: None,
3101
  }
3102

    
3103
  if disk_template not in req_size_dict:
3104
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3105
                                 " is unknown" %  disk_template)
3106

    
3107
  return req_size_dict[disk_template]
3108

    
3109

    
3110
class LUCreateInstance(LogicalUnit):
3111
  """Create an instance.
3112

3113
  """
3114
  HPATH = "instance-add"
3115
  HTYPE = constants.HTYPE_INSTANCE
3116
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3117
              "disk_template", "swap_size", "mode", "start", "vcpus",
3118
              "wait_for_sync", "ip_check", "mac"]
3119
  REQ_BGL = False
3120

    
3121
  def _ExpandNode(self, node):
3122
    """Expands and checks one node name.
3123

3124
    """
3125
    node_full = self.cfg.ExpandNodeName(node)
3126
    if node_full is None:
3127
      raise errors.OpPrereqError("Unknown node %s" % node)
3128
    return node_full
3129

    
3130
  def ExpandNames(self):
3131
    """ExpandNames for CreateInstance.
3132

3133
    Figure out the right locks for instance creation.
3134

3135
    """
3136
    self.needed_locks = {}
3137

    
3138
    # set optional parameters to none if they don't exist
3139
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3140
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3141
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3142
                 "vnc_bind_address"]:
3143
      if not hasattr(self.op, attr):
3144
        setattr(self.op, attr, None)
3145

    
3146
    # verify creation mode
3147
    if self.op.mode not in (constants.INSTANCE_CREATE,
3148
                            constants.INSTANCE_IMPORT):
3149
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3150
                                 self.op.mode)
3151
    # disk template and mirror node verification
3152
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3153
      raise errors.OpPrereqError("Invalid disk template name")
3154

    
3155
    #### instance parameters check
3156

    
3157
    # instance name verification
3158
    hostname1 = utils.HostInfo(self.op.instance_name)
3159
    self.op.instance_name = instance_name = hostname1.name
3160

    
3161
    # this is just a preventive check, but someone might still add this
3162
    # instance in the meantime, and creation will fail at lock-add time
3163
    if instance_name in self.cfg.GetInstanceList():
3164
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3165
                                 instance_name)
3166

    
3167
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3168

    
3169
    # ip validity checks
3170
    ip = getattr(self.op, "ip", None)
3171
    if ip is None or ip.lower() == "none":
3172
      inst_ip = None
3173
    elif ip.lower() == "auto":
3174
      inst_ip = hostname1.ip
3175
    else:
3176
      if not utils.IsValidIP(ip):
3177
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3178
                                   " like a valid IP" % ip)
3179
      inst_ip = ip
3180
    self.inst_ip = self.op.ip = inst_ip
3181
    # used in CheckPrereq for ip ping check
3182
    self.check_ip = hostname1.ip
3183

    
3184
    # MAC address verification
3185
    if self.op.mac != "auto":
3186
      if not utils.IsValidMac(self.op.mac.lower()):
3187
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3188
                                   self.op.mac)
3189

    
3190
    # boot order verification
3191
    if self.op.hvm_boot_order is not None:
3192
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3193
        raise errors.OpPrereqError("invalid boot order specified,"
3194
                                   " must be one or more of [acdn]")
3195
    # file storage checks
3196
    if (self.op.file_driver and
3197
        not self.op.file_driver in constants.FILE_DRIVER):
3198
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3199
                                 self.op.file_driver)
3200

    
3201
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3202
      raise errors.OpPrereqError("File storage directory path not absolute")
3203

    
3204
    ### Node/iallocator related checks
3205
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3206
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3207
                                 " node must be given")
3208

    
3209
    if self.op.iallocator:
3210
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3211
    else:
3212
      self.op.pnode = self._ExpandNode(self.op.pnode)
3213
      nodelist = [self.op.pnode]
3214
      if self.op.snode is not None:
3215
        self.op.snode = self._ExpandNode(self.op.snode)
3216
        nodelist.append(self.op.snode)
3217
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3218

    
3219
    # in case of import lock the source node too
3220
    if self.op.mode == constants.INSTANCE_IMPORT:
3221
      src_node = getattr(self.op, "src_node", None)
3222
      src_path = getattr(self.op, "src_path", None)
3223

    
3224
      if src_node is None or src_path is None:
3225
        raise errors.OpPrereqError("Importing an instance requires source"
3226
                                   " node and path options")
3227

    
3228
      if not os.path.isabs(src_path):
3229
        raise errors.OpPrereqError("The source path must be absolute")
3230

    
3231
      self.op.src_node = src_node = self._ExpandNode(src_node)
3232
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3233
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3234

    
3235
    else: # INSTANCE_CREATE
3236
      if getattr(self.op, "os_type", None) is None:
3237
        raise errors.OpPrereqError("No guest OS specified")
3238

    
3239
  def _RunAllocator(self):
3240
    """Run the allocator based on input opcode.
3241

3242
    """
3243
    disks = [{"size": self.op.disk_size, "mode": "w"},
3244
             {"size": self.op.swap_size, "mode": "w"}]
3245
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3246
             "bridge": self.op.bridge}]
3247
    ial = IAllocator(self.cfg, self.sstore,
3248
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3249
                     name=self.op.instance_name,
3250
                     disk_template=self.op.disk_template,
3251
                     tags=[],
3252
                     os=self.op.os_type,
3253
                     vcpus=self.op.vcpus,
3254
                     mem_size=self.op.mem_size,
3255
                     disks=disks,
3256
                     nics=nics,
3257
                     )
3258

    
3259
    ial.Run(self.op.iallocator)
3260

    
3261
    if not ial.success:
3262
      raise errors.OpPrereqError("Can't compute nodes using"
3263
                                 " iallocator '%s': %s" % (self.op.iallocator,
3264
                                                           ial.info))
3265
    if len(ial.nodes) != ial.required_nodes:
3266
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3267
                                 " of nodes (%s), required %s" %
3268
                                 (self.op.iallocator, len(ial.nodes),
3269
                                  ial.required_nodes))
3270
    self.op.pnode = ial.nodes[0]
3271
    logger.ToStdout("Selected nodes for the instance: %s" %
3272
                    (", ".join(ial.nodes),))
3273
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3274
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3275
    if ial.required_nodes == 2:
3276
      self.op.snode = ial.nodes[1]
3277

    
3278
  def BuildHooksEnv(self):
3279
    """Build hooks env.
3280

3281
    This runs on master, primary and secondary nodes of the instance.
3282

3283
    """
3284
    env = {
3285
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3286
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3287
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3288
      "INSTANCE_ADD_MODE": self.op.mode,
3289
      }
3290
    if self.op.mode == constants.INSTANCE_IMPORT:
3291
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3292
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3293
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3294

    
3295
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3296
      primary_node=self.op.pnode,
3297
      secondary_nodes=self.secondaries,
3298
      status=self.instance_status,
3299
      os_type=self.op.os_type,
3300
      memory=self.op.mem_size,
3301
      vcpus=self.op.vcpus,
3302
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3303
    ))
3304

    
3305
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3306
          self.secondaries)
3307
    return env, nl, nl
3308

    
3309

    
3310
  def CheckPrereq(self):
3311
    """Check prerequisites.
3312

3313
    """
3314
    if (not self.cfg.GetVGName() and
3315
        self.op.disk_template not in constants.DTS_NOT_LVM):
3316
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3317
                                 " instances")
3318

    
3319
    if self.op.mode == constants.INSTANCE_IMPORT:
3320
      src_node = self.op.src_node
3321
      src_path = self.op.src_path
3322

    
3323
      export_info = rpc.call_export_info(src_node, src_path)
3324

    
3325
      if not export_info:
3326
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3327

    
3328
      if not export_info.has_section(constants.INISECT_EXP):
3329
        raise errors.ProgrammerError("Corrupted export config")
3330

    
3331
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3332
      if (int(ei_version) != constants.EXPORT_VERSION):
3333
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3334
                                   (ei_version, constants.EXPORT_VERSION))
3335

    
3336
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3337
        raise errors.OpPrereqError("Can't import instance with more than"
3338
                                   " one data disk")
3339

    
3340
      # FIXME: are the old os-es, disk sizes, etc. useful?
3341
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3342
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3343
                                                         'disk0_dump'))
3344
      self.src_image = diskimage
3345

    
3346
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3347

    
3348
    if self.op.start and not self.op.ip_check:
3349
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3350
                                 " adding an instance in start mode")
3351

    
3352
    if self.op.ip_check:
3353
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3354
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3355
                                   (self.check_ip, instance_name))
3356

    
3357
    # bridge verification
3358
    bridge = getattr(self.op, "bridge", None)
3359
    if bridge is None:
3360
      self.op.bridge = self.cfg.GetDefBridge()
3361
    else:
3362
      self.op.bridge = bridge
3363

    
3364
    #### allocator run
3365

    
3366
    if self.op.iallocator is not None:
3367
      self._RunAllocator()
3368

    
3369
    #### node related checks
3370

    
3371
    # check primary node
3372
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3373
    assert self.pnode is not None, \
3374
      "Cannot retrieve locked node %s" % self.op.pnode
3375
    self.secondaries = []
3376

    
3377
    # mirror node verification
3378
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3379
      if self.op.snode is None:
3380
        raise errors.OpPrereqError("The networked disk templates need"
3381
                                   " a mirror node")
3382
      if self.op.snode == pnode.name:
3383
        raise errors.OpPrereqError("The secondary node cannot be"
3384
                                   " the primary node.")
3385
      self.secondaries.append(self.op.snode)
3386

    
3387
    req_size = _ComputeDiskSize(self.op.disk_template,
3388
                                self.op.disk_size, self.op.swap_size)
3389

    
3390
    # Check lv size requirements
3391
    if req_size is not None:
3392
      nodenames = [pnode.name] + self.secondaries
3393
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3394
      for node in nodenames:
3395
        info = nodeinfo.get(node, None)
3396
        if not info:
3397
          raise errors.OpPrereqError("Cannot get current information"
3398
                                     " from node '%s'" % node)
3399
        vg_free = info.get('vg_free', None)
3400
        if not isinstance(vg_free, int):
3401
          raise errors.OpPrereqError("Can't compute free disk space on"
3402
                                     " node %s" % node)
3403
        if req_size > info['vg_free']:
3404
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3405
                                     " %d MB available, %d MB required" %
3406
                                     (node, info['vg_free'], req_size))
3407

    
3408
    # os verification
3409
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3410
    if not os_obj:
3411
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3412
                                 " primary node"  % self.op.os_type)
3413

    
3414
    if self.op.kernel_path == constants.VALUE_NONE:
3415
      raise errors.OpPrereqError("Can't set instance kernel to none")
3416

    
3417
    # bridge check on primary node
3418
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3419
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3420
                                 " destination node '%s'" %
3421
                                 (self.op.bridge, pnode.name))
3422

    
3423
    # memory check on primary node
3424
    if self.op.start:
3425
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3426
                           "creating instance %s" % self.op.instance_name,
3427
                           self.op.mem_size)
3428

    
3429
    # hvm_cdrom_image_path verification
3430
    if self.op.hvm_cdrom_image_path is not None:
3431
      # FIXME (als): shouldn't these checks happen on the destination node?
3432
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3433
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3434
                                   " be an absolute path or None, not %s" %
3435
                                   self.op.hvm_cdrom_image_path)
3436
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3437
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3438
                                   " regular file or a symlink pointing to"
3439
                                   " an existing regular file, not %s" %
3440
                                   self.op.hvm_cdrom_image_path)
3441

    
3442
    # vnc_bind_address verification
3443
    if self.op.vnc_bind_address is not None:
3444
      if not utils.IsValidIP(self.op.vnc_bind_address):
3445
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3446
                                   " like a valid IP address" %
3447
                                   self.op.vnc_bind_address)
3448

    
3449
    # Xen HVM device type checks
3450
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3451
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3452
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3453
                                   " hypervisor" % self.op.hvm_nic_type)
3454
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3455
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3456
                                   " hypervisor" % self.op.hvm_disk_type)
3457

    
3458
    if self.op.start:
3459
      self.instance_status = 'up'
3460
    else:
3461
      self.instance_status = 'down'
3462

    
3463
  def Exec(self, feedback_fn):
3464
    """Create and add the instance to the cluster.
3465

3466
    """
3467
    instance = self.op.instance_name
3468
    pnode_name = self.pnode.name
3469

    
3470
    if self.op.mac == "auto":
3471
      mac_address = self.cfg.GenerateMAC()
3472
    else:
3473
      mac_address = self.op.mac
3474

    
3475
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3476
    if self.inst_ip is not None:
3477
      nic.ip = self.inst_ip
3478

    
3479
    ht_kind = self.sstore.GetHypervisorType()
3480
    if ht_kind in constants.HTS_REQ_PORT:
3481
      network_port = self.cfg.AllocatePort()
3482
    else:
3483
      network_port = None
3484

    
3485
    if self.op.vnc_bind_address is None:
3486
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3487

    
3488
    # this is needed because os.path.join does not accept None arguments
3489
    if self.op.file_storage_dir is None:
3490
      string_file_storage_dir = ""
3491
    else:
3492
      string_file_storage_dir = self.op.file_storage_dir
3493

    
3494
    # build the full file storage dir path
3495
    file_storage_dir = os.path.normpath(os.path.join(
3496
                                        self.sstore.GetFileStorageDir(),
3497
                                        string_file_storage_dir, instance))
3498

    
3499

    
3500
    disks = _GenerateDiskTemplate(self.cfg,
3501
                                  self.op.disk_template,
3502
                                  instance, pnode_name,
3503
                                  self.secondaries, self.op.disk_size,
3504
                                  self.op.swap_size,
3505
                                  file_storage_dir,
3506
                                  self.op.file_driver)
3507

    
3508
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3509
                            primary_node=pnode_name,
3510
                            memory=self.op.mem_size,
3511
                            vcpus=self.op.vcpus,
3512
                            nics=[nic], disks=disks,
3513
                            disk_template=self.op.disk_template,
3514
                            status=self.instance_status,
3515
                            network_port=network_port,
3516
                            kernel_path=self.op.kernel_path,
3517
                            initrd_path=self.op.initrd_path,
3518
                            hvm_boot_order=self.op.hvm_boot_order,
3519
                            hvm_acpi=self.op.hvm_acpi,
3520
                            hvm_pae=self.op.hvm_pae,
3521
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3522
                            vnc_bind_address=self.op.vnc_bind_address,
3523
                            hvm_nic_type=self.op.hvm_nic_type,
3524
                            hvm_disk_type=self.op.hvm_disk_type,
3525
                            )
3526

    
3527
    feedback_fn("* creating instance disks...")
3528
    if not _CreateDisks(self.cfg, iobj):
3529
      _RemoveDisks(iobj, self.cfg)
3530
      self.cfg.ReleaseDRBDMinors(instance)
3531
      raise errors.OpExecError("Device creation failed, reverting...")
3532

    
3533
    feedback_fn("adding instance %s to cluster config" % instance)
3534

    
3535
    self.cfg.AddInstance(iobj)
3536
    # Declare that we don't want to remove the instance lock anymore, as we've
3537
    # added the instance to the config
3538
    del self.remove_locks[locking.LEVEL_INSTANCE]
3539
    # Remove the temp. assignements for the instance's drbds
3540
    self.cfg.ReleaseDRBDMinors(instance)
3541

    
3542
    if self.op.wait_for_sync:
3543
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3544
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3545
      # make sure the disks are not degraded (still sync-ing is ok)
3546
      time.sleep(15)
3547
      feedback_fn("* checking mirrors status")
3548
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3549
    else:
3550
      disk_abort = False
3551

    
3552
    if disk_abort:
3553
      _RemoveDisks(iobj, self.cfg)
3554
      self.cfg.RemoveInstance(iobj.name)
3555
      # Make sure the instance lock gets removed
3556
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3557
      raise errors.OpExecError("There are some degraded disks for"
3558
                               " this instance")
3559

    
3560
    feedback_fn("creating os for instance %s on node %s" %
3561
                (instance, pnode_name))
3562

    
3563
    if iobj.disk_template != constants.DT_DISKLESS:
3564
      if self.op.mode == constants.INSTANCE_CREATE:
3565
        feedback_fn("* running the instance OS create scripts...")
3566
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3567
          raise errors.OpExecError("could not add os for instance %s"
3568
                                   " on node %s" %
3569
                                   (instance, pnode_name))
3570

    
3571
      elif self.op.mode == constants.INSTANCE_IMPORT:
3572
        feedback_fn("* running the instance OS import scripts...")
3573
        src_node = self.op.src_node
3574
        src_image = self.src_image
3575
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3576
                                                src_node, src_image):
3577
          raise errors.OpExecError("Could not import os for instance"
3578
                                   " %s on node %s" %
3579
                                   (instance, pnode_name))
3580
      else:
3581
        # also checked in the prereq part
3582
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3583
                                     % self.op.mode)
3584

    
3585
    if self.op.start:
3586
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3587
      feedback_fn("* starting instance...")
3588
      if not rpc.call_instance_start(pnode_name, iobj, None):
3589
        raise errors.OpExecError("Could not start instance")
3590

    
3591

    
3592
class LUConnectConsole(NoHooksLU):
3593
  """Connect to an instance's console.
3594

3595
  This is somewhat special in that it returns the command line that
3596
  you need to run on the master node in order to connect to the
3597
  console.
3598

3599
  """
3600
  _OP_REQP = ["instance_name"]
3601
  REQ_BGL = False
3602

    
3603
  def ExpandNames(self):
3604
    self._ExpandAndLockInstance()
3605

    
3606
  def CheckPrereq(self):
3607
    """Check prerequisites.
3608

3609
    This checks that the instance is in the cluster.
3610

3611
    """
3612
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3613
    assert self.instance is not None, \
3614
      "Cannot retrieve locked instance %s" % self.op.instance_name
3615

    
3616
  def Exec(self, feedback_fn):
3617
    """Connect to the console of an instance
3618

3619
    """
3620
    instance = self.instance
3621
    node = instance.primary_node
3622

    
3623
    node_insts = rpc.call_instance_list([node])[node]
3624
    if node_insts is False:
3625
      raise errors.OpExecError("Can't connect to node %s." % node)
3626

    
3627
    if instance.name not in node_insts:
3628
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3629

    
3630
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3631

    
3632
    hyper = hypervisor.GetHypervisor()
3633
    console_cmd = hyper.GetShellCommandForConsole(instance)
3634

    
3635
    # build ssh cmdline
3636
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3637

    
3638

    
3639
class LUReplaceDisks(LogicalUnit):
3640
  """Replace the disks of an instance.
3641

3642
  """
3643
  HPATH = "mirrors-replace"
3644
  HTYPE = constants.HTYPE_INSTANCE
3645
  _OP_REQP = ["instance_name", "mode", "disks"]
3646
  REQ_BGL = False
3647

    
3648
  def ExpandNames(self):
3649
    self._ExpandAndLockInstance()
3650

    
3651
    if not hasattr(self.op, "remote_node"):
3652
      self.op.remote_node = None
3653

    
3654
    ia_name = getattr(self.op, "iallocator", None)
3655
    if ia_name is not None:
3656
      if self.op.remote_node is not None:
3657
        raise errors.OpPrereqError("Give either the iallocator or the new"
3658
                                   " secondary, not both")
3659
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3660
    elif self.op.remote_node is not None:
3661
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3662
      if remote_node is None:
3663
        raise errors.OpPrereqError("Node '%s' not known" %
3664
                                   self.op.remote_node)
3665
      self.op.remote_node = remote_node
3666
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3667
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3668
    else:
3669
      self.needed_locks[locking.LEVEL_NODE] = []
3670
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3671

    
3672
  def DeclareLocks(self, level):
3673
    # If we're not already locking all nodes in the set we have to declare the
3674
    # instance's primary/secondary nodes.
3675
    if (level == locking.LEVEL_NODE and
3676
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3677
      self._LockInstancesNodes()
3678

    
3679
  def _RunAllocator(self):
3680
    """Compute a new secondary node using an IAllocator.
3681

3682
    """
3683
    ial = IAllocator(self.cfg, self.sstore,
3684
                     mode=constants.IALLOCATOR_MODE_RELOC,
3685
                     name=self.op.instance_name,
3686
                     relocate_from=[self.sec_node])
3687

    
3688
    ial.Run(self.op.iallocator)
3689

    
3690
    if not ial.success:
3691
      raise errors.OpPrereqError("Can't compute nodes using"
3692
                                 " iallocator '%s': %s" % (self.op.iallocator,
3693
                                                           ial.info))
3694
    if len(ial.nodes) != ial.required_nodes:
3695
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3696
                                 " of nodes (%s), required %s" %
3697
                                 (len(ial.nodes), ial.required_nodes))
3698
    self.op.remote_node = ial.nodes[0]
3699
    logger.ToStdout("Selected new secondary for the instance: %s" %
3700
                    self.op.remote_node)
3701

    
3702
  def BuildHooksEnv(self):
3703
    """Build hooks env.
3704

3705
    This runs on the master, the primary and all the secondaries.
3706

3707
    """
3708
    env = {
3709
      "MODE": self.op.mode,
3710
      "NEW_SECONDARY": self.op.remote_node,
3711
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3712
      }
3713
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3714
    nl = [
3715
      self.sstore.GetMasterNode(),
3716
      self.instance.primary_node,
3717
      ]
3718
    if self.op.remote_node is not None:
3719
      nl.append(self.op.remote_node)
3720
    return env, nl, nl
3721

    
3722
  def CheckPrereq(self):
3723
    """Check prerequisites.
3724

3725
    This checks that the instance is in the cluster.
3726

3727
    """
3728
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3729
    assert instance is not None, \
3730
      "Cannot retrieve locked instance %s" % self.op.instance_name
3731
    self.instance = instance
3732

    
3733
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3734
      raise errors.OpPrereqError("Instance's disk layout is not"
3735
                                 " network mirrored.")
3736

    
3737
    if len(instance.secondary_nodes) != 1:
3738
      raise errors.OpPrereqError("The instance has a strange layout,"
3739
                                 " expected one secondary but found %d" %
3740
                                 len(instance.secondary_nodes))
3741

    
3742
    self.sec_node = instance.secondary_nodes[0]
3743

    
3744
    ia_name = getattr(self.op, "iallocator", None)
3745
    if ia_name is not None:
3746
      self._RunAllocator()
3747

    
3748
    remote_node = self.op.remote_node
3749
    if remote_node is not None:
3750
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3751
      assert self.remote_node_info is not None, \
3752
        "Cannot retrieve locked node %s" % remote_node
3753
    else:
3754
      self.remote_node_info = None
3755
    if remote_node == instance.primary_node:
3756
      raise errors.OpPrereqError("The specified node is the primary node of"
3757
                                 " the instance.")
3758
    elif remote_node == self.sec_node:
3759
      if self.op.mode == constants.REPLACE_DISK_SEC:
3760
        # this is for DRBD8, where we can't execute the same mode of
3761
        # replacement as for drbd7 (no different port allocated)
3762
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3763
                                   " replacement")
3764
    if instance.disk_template == constants.DT_DRBD8:
3765
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3766
          remote_node is not None):
3767
        # switch to replace secondary mode
3768
        self.op.mode = constants.REPLACE_DISK_SEC
3769

    
3770
      if self.op.mode == constants.REPLACE_DISK_ALL:
3771
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3772
                                   " secondary disk replacement, not"
3773
                                   " both at once")
3774
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3775
        if remote_node is not None:
3776
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3777
                                     " the secondary while doing a primary"
3778
                                     " node disk replacement")
3779
        self.tgt_node = instance.primary_node
3780
        self.oth_node = instance.secondary_nodes[0]
3781
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3782
        self.new_node = remote_node # this can be None, in which case
3783
                                    # we don't change the secondary
3784
        self.tgt_node = instance.secondary_nodes[0]
3785
        self.oth_node = instance.primary_node
3786
      else:
3787
        raise errors.ProgrammerError("Unhandled disk replace mode")
3788

    
3789
    for name in self.op.disks:
3790
      if instance.FindDisk(name) is None:
3791
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3792
                                   (name, instance.name))
3793

    
3794
  def _ExecD8DiskOnly(self, feedback_fn):
3795
    """Replace a disk on the primary or secondary for dbrd8.
3796

3797
    The algorithm for replace is quite complicated:
3798
      - for each disk to be replaced:
3799
        - create new LVs on the target node with unique names
3800
        - detach old LVs from the drbd device
3801
        - rename old LVs to name_replaced.<time_t>
3802
        - rename new LVs to old LVs
3803
        - attach the new LVs (with the old names now) to the drbd device
3804
      - wait for sync across all devices
3805
      - for each modified disk:
3806
        - remove old LVs (which have the name name_replaces.<time_t>)
3807

3808
    Failures are not very well handled.
3809

3810
    """
3811
    steps_total = 6
3812
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3813
    instance = self.instance
3814
    iv_names = {}
3815
    vgname = self.cfg.GetVGName()
3816
    # start of work
3817
    cfg = self.cfg
3818
    tgt_node = self.tgt_node
3819
    oth_node = self.oth_node
3820

    
3821
    # Step: check device activation
3822
    self.proc.LogStep(1, steps_total, "check device existence")
3823
    info("checking volume groups")
3824
    my_vg = cfg.GetVGName()
3825
    results = rpc.call_vg_list([oth_node, tgt_node])
3826
    if not results:
3827
      raise errors.OpExecError("Can't list volume groups on the nodes")
3828
    for node in oth_node, tgt_node:
3829
      res = results.get(node, False)
3830
      if not res or my_vg not in res:
3831
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3832
                                 (my_vg, node))
3833
    for dev in instance.disks:
3834
      if not dev.iv_name in self.op.disks:
3835
        continue
3836
      for node in tgt_node, oth_node:
3837
        info("checking %s on %s" % (dev.iv_name, node))
3838
        cfg.SetDiskID(dev, node)
3839
        if not rpc.call_blockdev_find(node, dev):
3840
          raise errors.OpExecError("Can't find device %s on node %s" %
3841
                                   (dev.iv_name, node))
3842

    
3843
    # Step: check other node consistency
3844
    self.proc.LogStep(2, steps_total, "check peer consistency")
3845
    for dev in instance.disks:
3846
      if not dev.iv_name in self.op.disks:
3847
        continue
3848
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3849
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3850
                                   oth_node==instance.primary_node):
3851
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3852
                                 " to replace disks on this node (%s)" %
3853
                                 (oth_node, tgt_node))
3854

    
3855
    # Step: create new storage
3856
    self.proc.LogStep(3, steps_total, "allocate new storage")
3857
    for dev in instance.disks:
3858
      if not dev.iv_name in self.op.disks:
3859
        continue
3860
      size = dev.size
3861
      cfg.SetDiskID(dev, tgt_node)
3862
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3863
      names = _GenerateUniqueNames(cfg, lv_names)
3864
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3865
                             logical_id=(vgname, names[0]))
3866
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3867
                             logical_id=(vgname, names[1]))
3868
      new_lvs = [lv_data, lv_meta]
3869
      old_lvs = dev.children
3870
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3871
      info("creating new local storage on %s for %s" %
3872
           (tgt_node, dev.iv_name))
3873
      # since we *always* want to create this LV, we use the
3874
      # _Create...OnPrimary (which forces the creation), even if we
3875
      # are talking about the secondary node
3876
      for new_lv in new_lvs:
3877
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3878
                                        _GetInstanceInfoText(instance)):
3879
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3880
                                   " node '%s'" %
3881
                                   (new_lv.logical_id[1], tgt_node))
3882

    
3883
    # Step: for each lv, detach+rename*2+attach
3884
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3885
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3886
      info("detaching %s drbd from local storage" % dev.iv_name)
3887
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3888
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3889
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3890
      #dev.children = []
3891
      #cfg.Update(instance)
3892

    
3893
      # ok, we created the new LVs, so now we know we have the needed
3894
      # storage; as such, we proceed on the target node to rename
3895
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3896
      # using the assumption that logical_id == physical_id (which in
3897
      # turn is the unique_id on that node)
3898

    
3899
      # FIXME(iustin): use a better name for the replaced LVs
3900
      temp_suffix = int(time.time())
3901
      ren_fn = lambda d, suff: (d.physical_id[0],
3902
                                d.physical_id[1] + "_replaced-%s" % suff)
3903
      # build the rename list based on what LVs exist on the node
3904
      rlist = []
3905
      for to_ren in old_lvs:
3906
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3907
        if find_res is not None: # device exists
3908
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3909

    
3910
      info("renaming the old LVs on the target node")
3911
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3912
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3913
      # now we rename the new LVs to the old LVs
3914
      info("renaming the new LVs on the target node")
3915
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3916
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3917
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3918

    
3919
      for old, new in zip(old_lvs, new_lvs):
3920
        new.logical_id = old.logical_id
3921
        cfg.SetDiskID(new, tgt_node)
3922

    
3923
      for disk in old_lvs:
3924
        disk.logical_id = ren_fn(disk, temp_suffix)
3925
        cfg.SetDiskID(disk, tgt_node)
3926

    
3927
      # now that the new lvs have the old name, we can add them to the device
3928
      info("adding new mirror component on %s" % tgt_node)
3929
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3930
        for new_lv in new_lvs:
3931
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3932
            warning("Can't rollback device %s", hint="manually cleanup unused"
3933
                    " logical volumes")
3934
        raise errors.OpExecError("Can't add local storage to drbd")
3935

    
3936
      dev.children = new_lvs
3937
      cfg.Update(instance)
3938

    
3939
    # Step: wait for sync
3940

    
3941
    # this can fail as the old devices are degraded and _WaitForSync
3942
    # does a combined result over all disks, so we don't check its
3943
    # return value
3944
    self.proc.LogStep(5, steps_total, "sync devices")
3945
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3946

    
3947
    # so check manually all the devices
3948
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3949
      cfg.SetDiskID(dev, instance.primary_node)
3950
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3951
      if is_degr:
3952
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3953

    
3954
    # Step: remove old storage
3955
    self.proc.LogStep(6, steps_total, "removing old storage")
3956
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3957
      info("remove logical volumes for %s" % name)
3958
      for lv in old_lvs:
3959
        cfg.SetDiskID(lv, tgt_node)
3960
        if not rpc.call_blockdev_remove(tgt_node, lv):
3961
          warning("Can't remove old LV", hint="manually remove unused LVs")
3962
          continue
3963

    
3964
  def _ExecD8Secondary(self, feedback_fn):
3965
    """Replace the secondary node for drbd8.
3966

3967
    The algorithm for replace is quite complicated:
3968
      - for all disks of the instance:
3969
        - create new LVs on the new node with same names
3970
        - shutdown the drbd device on the old secondary
3971
        - disconnect the drbd network on the primary
3972
        - create the drbd device on the new secondary
3973
        - network attach the drbd on the primary, using an artifice:
3974
          the drbd code for Attach() will connect to the network if it
3975
          finds a device which is connected to the good local disks but
3976
          not network enabled
3977
      - wait for sync across all devices
3978
      - remove all disks from the old secondary
3979

3980
    Failures are not very well handled.
3981

3982
    """
3983
    steps_total = 6
3984
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3985
    instance = self.instance
3986
    iv_names = {}
3987
    vgname = self.cfg.GetVGName()
3988
    # start of work
3989
    cfg = self.cfg
3990
    old_node = self.tgt_node
3991
    new_node = self.new_node
3992
    pri_node = instance.primary_node
3993

    
3994
    # Step: check device activation
3995
    self.proc.LogStep(1, steps_total, "check device existence")
3996
    info("checking volume groups")
3997
    my_vg = cfg.GetVGName()
3998
    results = rpc.call_vg_list([pri_node, new_node])
3999
    if not results:
4000
      raise errors.OpExecError("Can't list volume groups on the nodes")
4001
    for node in pri_node, new_node:
4002
      res = results.get(node, False)
4003
      if not res or my_vg not in res:
4004
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4005
                                 (my_vg, node))
4006
    for dev in instance.disks:
4007
      if not dev.iv_name in self.op.disks:
4008
        continue
4009
      info("checking %s on %s" % (dev.iv_name, pri_node))
4010
      cfg.SetDiskID(dev, pri_node)
4011
      if not rpc.call_blockdev_find(pri_node, dev):
4012
        raise errors.OpExecError("Can't find device %s on node %s" %
4013
                                 (dev.iv_name, pri_node))
4014

    
4015
    # Step: check other node consistency
4016
    self.proc.LogStep(2, steps_total, "check peer consistency")
4017
    for dev in instance.disks:
4018
      if not dev.iv_name in self.op.disks:
4019
        continue
4020
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4021
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4022
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4023
                                 " unsafe to replace the secondary" %
4024
                                 pri_node)
4025

    
4026
    # Step: create new storage
4027
    self.proc.LogStep(3, steps_total, "allocate new storage")
4028
    for dev in instance.disks:
4029
      size = dev.size
4030
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4031
      # since we *always* want to create this LV, we use the
4032
      # _Create...OnPrimary (which forces the creation), even if we
4033
      # are talking about the secondary node
4034
      for new_lv in dev.children:
4035
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4036
                                        _GetInstanceInfoText(instance)):
4037
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4038
                                   " node '%s'" %
4039
                                   (new_lv.logical_id[1], new_node))
4040

    
4041

    
4042
    # Step 4: dbrd minors and drbd setups changes
4043
    # after this, we must manually remove the drbd minors on both the
4044
    # error and the success paths
4045
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4046
                                   instance.name)
4047
    logging.debug("Allocated minors %s" % (minors,))
4048
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4049
    for dev, new_minor in zip(instance.disks, minors):
4050
      size = dev.size
4051
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4052
      # create new devices on new_node
4053
      if pri_node == dev.logical_id[0]:
4054
        new_logical_id = (pri_node, new_node,
4055
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4056
                          dev.logical_id[5])
4057
      else:
4058
        new_logical_id = (new_node, pri_node,
4059
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4060
                          dev.logical_id[5])
4061
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4062
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4063
                    new_logical_id)
4064
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4065
                              logical_id=new_logical_id,
4066
                              children=dev.children)
4067
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4068
                                        new_drbd, False,
4069
                                      _GetInstanceInfoText(instance)):
4070
        self.cfg.ReleaseDRBDMinors(instance.name)
4071
        raise errors.OpExecError("Failed to create new DRBD on"
4072
                                 " node '%s'" % new_node)
4073

    
4074
    for dev in instance.disks:
4075
      # we have new devices, shutdown the drbd on the old secondary
4076
      info("shutting down drbd for %s on old node" % dev.iv_name)
4077
      cfg.SetDiskID(dev, old_node)
4078
      if not rpc.call_blockdev_shutdown(old_node, dev):
4079
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4080
                hint="Please cleanup this device manually as soon as possible")
4081

    
4082
    info("detaching primary drbds from the network (=> standalone)")
4083
    done = 0
4084
    for dev in instance.disks:
4085
      cfg.SetDiskID(dev, pri_node)
4086
      # set the network part of the physical (unique in bdev terms) id
4087
      # to None, meaning detach from network
4088
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4089
      # and 'find' the device, which will 'fix' it to match the
4090
      # standalone state
4091
      if rpc.call_blockdev_find(pri_node, dev):
4092
        done += 1
4093
      else:
4094
        warning("Failed to detach drbd %s from network, unusual case" %
4095
                dev.iv_name)
4096

    
4097
    if not done:
4098
      # no detaches succeeded (very unlikely)
4099
      self.cfg.ReleaseDRBDMinors(instance.name)
4100
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4101

    
4102
    # if we managed to detach at least one, we update all the disks of
4103
    # the instance to point to the new secondary
4104
    info("updating instance configuration")
4105
    for dev, _, new_logical_id in iv_names.itervalues():
4106
      dev.logical_id = new_logical_id
4107
      cfg.SetDiskID(dev, pri_node)
4108
    cfg.Update(instance)
4109
    # we can remove now the temp minors as now the new values are
4110
    # written to the config file (and therefore stable)
4111
    self.cfg.ReleaseDRBDMinors(instance.name)
4112

    
4113
    # and now perform the drbd attach
4114
    info("attaching primary drbds to new secondary (standalone => connected)")
4115
    failures = []
4116
    for dev in instance.disks:
4117
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4118
      # since the attach is smart, it's enough to 'find' the device,
4119
      # it will automatically activate the network, if the physical_id
4120
      # is correct
4121
      cfg.SetDiskID(dev, pri_node)
4122
      logging.debug("Disk to attach: %s", dev)
4123
      if not rpc.call_blockdev_find(pri_node, dev):
4124
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4125
                "please do a gnt-instance info to see the status of disks")
4126

    
4127
    # this can fail as the old devices are degraded and _WaitForSync
4128
    # does a combined result over all disks, so we don't check its
4129
    # return value
4130
    self.proc.LogStep(5, steps_total, "sync devices")
4131
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4132

    
4133
    # so check manually all the devices
4134
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4135
      cfg.SetDiskID(dev, pri_node)
4136
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4137
      if is_degr:
4138
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4139

    
4140
    self.proc.LogStep(6, steps_total, "removing old storage")
4141
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4142
      info("remove logical volumes for %s" % name)
4143
      for lv in old_lvs:
4144
        cfg.SetDiskID(lv, old_node)
4145
        if not rpc.call_blockdev_remove(old_node, lv):
4146
          warning("Can't remove LV on old secondary",
4147
                  hint="Cleanup stale volumes by hand")
4148

    
4149
  def Exec(self, feedback_fn):
4150
    """Execute disk replacement.
4151

4152
    This dispatches the disk replacement to the appropriate handler.
4153

4154
    """
4155
    instance = self.instance
4156

    
4157
    # Activate the instance disks if we're replacing them on a down instance
4158
    if instance.status == "down":
4159
      _StartInstanceDisks(self.cfg, instance, True)
4160

    
4161
    if instance.disk_template == constants.DT_DRBD8:
4162
      if self.op.remote_node is None:
4163
        fn = self._ExecD8DiskOnly
4164
      else:
4165
        fn = self._ExecD8Secondary
4166
    else:
4167
      raise errors.ProgrammerError("Unhandled disk replacement case")
4168

    
4169
    ret = fn(feedback_fn)
4170

    
4171
    # Deactivate the instance disks if we're replacing them on a down instance
4172
    if instance.status == "down":
4173
      _SafeShutdownInstanceDisks(instance, self.cfg)
4174

    
4175
    return ret
4176

    
4177

    
4178
class LUGrowDisk(LogicalUnit):
4179
  """Grow a disk of an instance.
4180

4181
  """
4182
  HPATH = "disk-grow"
4183
  HTYPE = constants.HTYPE_INSTANCE
4184
  _OP_REQP = ["instance_name", "disk", "amount"]
4185
  REQ_BGL = False
4186

    
4187
  def ExpandNames(self):
4188
    self._ExpandAndLockInstance()
4189
    self.needed_locks[locking.LEVEL_NODE] = []
4190
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4191

    
4192
  def DeclareLocks(self, level):
4193
    if level == locking.LEVEL_NODE:
4194
      self._LockInstancesNodes()
4195

    
4196
  def BuildHooksEnv(self):
4197
    """Build hooks env.
4198

4199
    This runs on the master, the primary and all the secondaries.
4200

4201
    """
4202
    env = {
4203
      "DISK": self.op.disk,
4204
      "AMOUNT": self.op.amount,
4205
      }
4206
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4207
    nl = [
4208
      self.sstore.GetMasterNode(),
4209
      self.instance.primary_node,
4210
      ]
4211
    return env, nl, nl
4212

    
4213
  def CheckPrereq(self):
4214
    """Check prerequisites.
4215

4216
    This checks that the instance is in the cluster.
4217

4218
    """
4219
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4220
    assert instance is not None, \
4221
      "Cannot retrieve locked instance %s" % self.op.instance_name
4222

    
4223
    self.instance = instance
4224

    
4225
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4226
      raise errors.OpPrereqError("Instance's disk layout does not support"
4227
                                 " growing.")
4228

    
4229
    if instance.FindDisk(self.op.disk) is None:
4230
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4231
                                 (self.op.disk, instance.name))
4232

    
4233
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4234
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4235
    for node in nodenames:
4236
      info = nodeinfo.get(node, None)
4237
      if not info:
4238
        raise errors.OpPrereqError("Cannot get current information"
4239
                                   " from node '%s'" % node)
4240
      vg_free = info.get('vg_free', None)
4241
      if not isinstance(vg_free, int):
4242
        raise errors.OpPrereqError("Can't compute free disk space on"
4243
                                   " node %s" % node)
4244
      if self.op.amount > info['vg_free']:
4245
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4246
                                   " %d MiB available, %d MiB required" %
4247
                                   (node, info['vg_free'], self.op.amount))
4248

    
4249
  def Exec(self, feedback_fn):
4250
    """Execute disk grow.
4251

4252
    """
4253
    instance = self.instance
4254
    disk = instance.FindDisk(self.op.disk)
4255
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4256
      self.cfg.SetDiskID(disk, node)
4257
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4258
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4259
        raise errors.OpExecError("grow request failed to node %s" % node)
4260
      elif not result[0]:
4261
        raise errors.OpExecError("grow request failed to node %s: %s" %
4262
                                 (node, result[1]))
4263
    disk.RecordGrow(self.op.amount)
4264
    self.cfg.Update(instance)
4265
    return
4266

    
4267

    
4268
class LUQueryInstanceData(NoHooksLU):
4269
  """Query runtime instance data.
4270

4271
  """
4272
  _OP_REQP = ["instances"]
4273
  REQ_BGL = False
4274
  def ExpandNames(self):
4275
    self.needed_locks = {}
4276
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4277

    
4278
    if not isinstance(self.op.instances, list):
4279
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4280

    
4281
    if self.op.instances:
4282
      self.wanted_names = []
4283
      for name in self.op.instances:
4284
        full_name = self.cfg.ExpandInstanceName(name)
4285
        if full_name is None:
4286
          raise errors.OpPrereqError("Instance '%s' not known" %
4287
                                     self.op.instance_name)
4288
        self.wanted_names.append(full_name)
4289
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4290
    else:
4291
      self.wanted_names = None
4292
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4293

    
4294
    self.needed_locks[locking.LEVEL_NODE] = []
4295
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4296

    
4297
  def DeclareLocks(self, level):
4298
    if level == locking.LEVEL_NODE:
4299
      self._LockInstancesNodes()
4300

    
4301
  def CheckPrereq(self):
4302
    """Check prerequisites.
4303

4304
    This only checks the optional instance list against the existing names.
4305

4306
    """
4307
    if self.wanted_names is None:
4308
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4309

    
4310
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4311
                             in self.wanted_names]
4312
    return
4313

    
4314
  def _ComputeDiskStatus(self, instance, snode, dev):
4315
    """Compute block device status.
4316

4317
    """
4318
    self.cfg.SetDiskID(dev, instance.primary_node)
4319
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4320
    if dev.dev_type in constants.LDS_DRBD:
4321
      # we change the snode then (otherwise we use the one passed in)
4322
      if dev.logical_id[0] == instance.primary_node:
4323
        snode = dev.logical_id[1]
4324
      else:
4325
        snode = dev.logical_id[0]
4326

    
4327
    if snode:
4328
      self.cfg.SetDiskID(dev, snode)
4329
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4330
    else:
4331
      dev_sstatus = None
4332

    
4333
    if dev.children:
4334
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4335
                      for child in dev.children]
4336
    else:
4337
      dev_children = []
4338

    
4339
    data = {
4340
      "iv_name": dev.iv_name,
4341
      "dev_type": dev.dev_type,
4342
      "logical_id": dev.logical_id,
4343
      "physical_id": dev.physical_id,
4344
      "pstatus": dev_pstatus,
4345
      "sstatus": dev_sstatus,
4346
      "children": dev_children,
4347
      }
4348

    
4349
    return data
4350

    
4351
  def Exec(self, feedback_fn):
4352
    """Gather and return data"""
4353
    result = {}
4354
    for instance in self.wanted_instances:
4355
      remote_info = rpc.call_instance_info(instance.primary_node,
4356
                                                instance.name)
4357
      if remote_info and "state" in remote_info:
4358
        remote_state = "up"
4359
      else:
4360
        remote_state = "down"
4361
      if instance.status == "down":
4362
        config_state = "down"
4363
      else:
4364
        config_state = "up"
4365

    
4366
      disks = [self._ComputeDiskStatus(instance, None, device)
4367
               for device in instance.disks]
4368

    
4369
      idict = {
4370
        "name": instance.name,
4371
        "config_state": config_state,
4372
        "run_state": remote_state,
4373
        "pnode": instance.primary_node,
4374
        "snodes": instance.secondary_nodes,
4375
        "os": instance.os,
4376
        "memory": instance.memory,
4377
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4378
        "disks": disks,
4379
        "vcpus": instance.vcpus,
4380
        }
4381

    
4382
      htkind = self.sstore.GetHypervisorType()
4383
      if htkind == constants.HT_XEN_PVM30:
4384
        idict["kernel_path"] = instance.kernel_path
4385
        idict["initrd_path"] = instance.initrd_path
4386

    
4387
      if htkind == constants.HT_XEN_HVM31:
4388
        idict["hvm_boot_order"] = instance.hvm_boot_order
4389
        idict["hvm_acpi"] = instance.hvm_acpi
4390
        idict["hvm_pae"] = instance.hvm_pae
4391
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4392
        idict["hvm_nic_type"] = instance.hvm_nic_type
4393
        idict["hvm_disk_type"] = instance.hvm_disk_type
4394

    
4395
      if htkind in constants.HTS_REQ_PORT:
4396
        if instance.vnc_bind_address is None:
4397
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4398
        else:
4399
          vnc_bind_address = instance.vnc_bind_address
4400
        if instance.network_port is None:
4401
          vnc_console_port = None
4402
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4403
          vnc_console_port = "%s:%s" % (instance.primary_node,
4404
                                       instance.network_port)
4405
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4406
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4407
                                                   instance.network_port,
4408
                                                   instance.primary_node)
4409
        else:
4410
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4411
                                        instance.network_port)
4412
        idict["vnc_console_port"] = vnc_console_port
4413
        idict["vnc_bind_address"] = vnc_bind_address
4414
        idict["network_port"] = instance.network_port
4415

    
4416
      result[instance.name] = idict
4417

    
4418
    return result
4419

    
4420

    
4421
class LUSetInstanceParams(LogicalUnit):
4422
  """Modifies an instances's parameters.
4423

4424
  """
4425
  HPATH = "instance-modify"
4426
  HTYPE = constants.HTYPE_INSTANCE
4427
  _OP_REQP = ["instance_name"]
4428
  REQ_BGL = False
4429

    
4430
  def ExpandNames(self):
4431
    self._ExpandAndLockInstance()
4432

    
4433
  def BuildHooksEnv(self):
4434
    """Build hooks env.
4435

4436
    This runs on the master, primary and secondaries.
4437

4438
    """
4439
    args = dict()
4440
    if self.mem:
4441
      args['memory'] = self.mem
4442
    if self.vcpus:
4443
      args['vcpus'] = self.vcpus
4444
    if self.do_ip or self.do_bridge or self.mac:
4445
      if self.do_ip:
4446
        ip = self.ip
4447
      else:
4448
        ip = self.instance.nics[0].ip
4449
      if self.bridge:
4450
        bridge = self.bridge
4451
      else:
4452
        bridge = self.instance.nics[0].bridge
4453
      if self.mac:
4454
        mac = self.mac
4455
      else:
4456
        mac = self.instance.nics[0].mac
4457
      args['nics'] = [(ip, bridge, mac)]
4458
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4459
    nl = [self.sstore.GetMasterNode(),
4460
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4461
    return env, nl, nl
4462

    
4463
  def CheckPrereq(self):
4464
    """Check prerequisites.
4465

4466
    This only checks the instance list against the existing names.
4467

4468
    """
4469
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4470
    # a separate CheckArguments function, if we implement one, so the operation
4471
    # can be aborted without waiting for any lock, should it have an error...
4472
    self.mem = getattr(self.op, "mem", None)
4473
    self.vcpus = getattr(self.op, "vcpus", None)
4474
    self.ip = getattr(self.op, "ip", None)
4475
    self.mac = getattr(self.op, "mac", None)
4476
    self.bridge = getattr(self.op, "bridge", None)
4477
    self.kernel_path = getattr(self.op, "kernel_path", None)
4478
    self.initrd_path = getattr(self.op, "initrd_path", None)
4479
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4480
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4481
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4482
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4483
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4484
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4485
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4486
    self.force = getattr(self.op, "force", None)
4487
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4488
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4489
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4490
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4491
    if all_parms.count(None) == len(all_parms):
4492
      raise errors.OpPrereqError("No changes submitted")
4493
    if self.mem is not None:
4494
      try:
4495
        self.mem = int(self.mem)
4496
      except ValueError, err:
4497
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4498
    if self.vcpus is not None:
4499
      try:
4500
        self.vcpus = int(self.vcpus)
4501
      except ValueError, err:
4502
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4503
    if self.ip is not None:
4504
      self.do_ip = True
4505
      if self.ip.lower() == "none":
4506
        self.ip = None
4507
      else:
4508
        if not utils.IsValidIP(self.ip):
4509
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4510
    else:
4511
      self.do_ip = False
4512
    self.do_bridge = (self.bridge is not None)
4513
    if self.mac is not None:
4514
      if self.cfg.IsMacInUse(self.mac):
4515
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4516
                                   self.mac)
4517
      if not utils.IsValidMac(self.mac):
4518
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4519

    
4520
    if self.kernel_path is not None:
4521
      self.do_kernel_path = True
4522
      if self.kernel_path == constants.VALUE_NONE:
4523
        raise errors.OpPrereqError("Can't set instance to no kernel")
4524

    
4525
      if self.kernel_path != constants.VALUE_DEFAULT:
4526
        if not os.path.isabs(self.kernel_path):
4527
          raise errors.OpPrereqError("The kernel path must be an absolute"
4528
                                    " filename")
4529
    else:
4530
      self.do_kernel_path = False
4531

    
4532
    if self.initrd_path is not None:
4533
      self.do_initrd_path = True
4534
      if self.initrd_path not in (constants.VALUE_NONE,
4535
                                  constants.VALUE_DEFAULT):
4536
        if not os.path.isabs(self.initrd_path):
4537
          raise errors.OpPrereqError("The initrd path must be an absolute"
4538
                                    " filename")
4539
    else:
4540
      self.do_initrd_path = False
4541

    
4542
    # boot order verification
4543
    if self.hvm_boot_order is not None:
4544
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4545
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4546
          raise errors.OpPrereqError("invalid boot order specified,"
4547
                                     " must be one or more of [acdn]"
4548
                                     " or 'default'")
4549

    
4550
    # hvm_cdrom_image_path verification
4551
    if self.op.hvm_cdrom_image_path is not None:
4552
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4553
              self.op.hvm_cdrom_image_path.lower() == "none"):
4554
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4555
                                   " be an absolute path or None, not %s" %
4556
                                   self.op.hvm_cdrom_image_path)
4557
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4558
              self.op.hvm_cdrom_image_path.lower() == "none"):
4559
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4560
                                   " regular file or a symlink pointing to"
4561
                                   " an existing regular file, not %s" %
4562
                                   self.op.hvm_cdrom_image_path)
4563

    
4564
    # vnc_bind_address verification
4565
    if self.op.vnc_bind_address is not None:
4566
      if not utils.IsValidIP(self.op.vnc_bind_address):
4567
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4568
                                   " like a valid IP address" %
4569
                                   self.op.vnc_bind_address)
4570

    
4571
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4572
    assert self.instance is not None, \
4573
      "Cannot retrieve locked instance %s" % self.op.instance_name
4574
    self.warn = []
4575
    if self.mem is not None and not self.force:
4576
      pnode = self.instance.primary_node
4577
      nodelist = [pnode]
4578
      nodelist.extend(instance.secondary_nodes)
4579
      instance_info = rpc.call_instance_info(pnode, instance.name)
4580
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4581

    
4582
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4583
        # Assume the primary node is unreachable and go ahead
4584
        self.warn.append("Can't get info from primary node %s" % pnode)
4585
      else:
4586
        if instance_info:
4587
          current_mem = instance_info['memory']
4588
        else:
4589
          # Assume instance not running
4590
          # (there is a slight race condition here, but it's not very probable,
4591
          # and we have no other way to check)
4592
          current_mem = 0
4593
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4594
        if miss_mem > 0:
4595
          raise errors.OpPrereqError("This change will prevent the instance"
4596
                                     " from starting, due to %d MB of memory"
4597
                                     " missing on its primary node" % miss_mem)
4598

    
4599
      for node in instance.secondary_nodes:
4600
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4601
          self.warn.append("Can't get info from secondary node %s" % node)
4602
        elif self.mem > nodeinfo[node]['memory_free']:
4603
          self.warn.append("Not enough memory to failover instance to secondary"
4604
                           " node %s" % node)
4605

    
4606
    # Xen HVM device type checks
4607
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4608
      if self.op.hvm_nic_type is not None:
4609
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4610
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4611
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4612
      if self.op.hvm_disk_type is not None:
4613
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4614
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4615
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4616

    
4617
    return
4618

    
4619
  def Exec(self, feedback_fn):
4620
    """Modifies an instance.
4621

4622
    All parameters take effect only at the next restart of the instance.
4623
    """
4624
    # Process here the warnings from CheckPrereq, as we don't have a
4625
    # feedback_fn there.
4626
    for warn in self.warn:
4627
      feedback_fn("WARNING: %s" % warn)
4628

    
4629
    result = []
4630
    instance = self.instance
4631
    if self.mem:
4632
      instance.memory = self.mem
4633
      result.append(("mem", self.mem))
4634
    if self.vcpus:
4635
      instance.vcpus = self.vcpus
4636
      result.append(("vcpus",  self.vcpus))
4637
    if self.do_ip:
4638
      instance.nics[0].ip = self.ip
4639
      result.append(("ip", self.ip))
4640
    if self.bridge:
4641
      instance.nics[0].bridge = self.bridge
4642
      result.append(("bridge", self.bridge))
4643
    if self.mac:
4644
      instance.nics[0].mac = self.mac
4645
      result.append(("mac", self.mac))
4646
    if self.do_kernel_path:
4647
      instance.kernel_path = self.kernel_path
4648
      result.append(("kernel_path", self.kernel_path))
4649
    if self.do_initrd_path:
4650
      instance.initrd_path = self.initrd_path
4651
      result.append(("initrd_path", self.initrd_path))
4652
    if self.hvm_boot_order:
4653
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4654
        instance.hvm_boot_order = None
4655
      else:
4656
        instance.hvm_boot_order = self.hvm_boot_order
4657
      result.append(("hvm_boot_order", self.hvm_boot_order))
4658
    if self.hvm_acpi is not None:
4659
      instance.hvm_acpi = self.hvm_acpi
4660
      result.append(("hvm_acpi", self.hvm_acpi))
4661
    if self.hvm_pae is not None:
4662
      instance.hvm_pae = self.hvm_pae
4663
      result.append(("hvm_pae", self.hvm_pae))
4664
    if self.hvm_nic_type is not None:
4665
      instance.hvm_nic_type = self.hvm_nic_type
4666
      result.append(("hvm_nic_type", self.hvm_nic_type))
4667
    if self.hvm_disk_type is not None:
4668
      instance.hvm_disk_type = self.hvm_disk_type
4669
      result.append(("hvm_disk_type", self.hvm_disk_type))
4670
    if self.hvm_cdrom_image_path:
4671
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4672
        instance.hvm_cdrom_image_path = None
4673
      else:
4674
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4675
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4676
    if self.vnc_bind_address:
4677
      instance.vnc_bind_address = self.vnc_bind_address
4678
      result.append(("vnc_bind_address", self.vnc_bind_address))
4679

    
4680
    self.cfg.Update(instance)
4681

    
4682
    return result
4683

    
4684

    
4685
class LUQueryExports(NoHooksLU):
4686
  """Query the exports list
4687

4688
  """
4689
  _OP_REQP = ['nodes']
4690
  REQ_BGL = False
4691

    
4692
  def ExpandNames(self):
4693
    self.needed_locks = {}
4694
    self.share_locks[locking.LEVEL_NODE] = 1
4695
    if not self.op.nodes:
4696
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4697
    else:
4698
      self.needed_locks[locking.LEVEL_NODE] = \
4699
        _GetWantedNodes(self, self.op.nodes)
4700

    
4701
  def CheckPrereq(self):
4702
    """Check prerequisites.
4703

4704
    """
4705
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4706

    
4707
  def Exec(self, feedback_fn):
4708
    """Compute the list of all the exported system images.
4709

4710
    Returns:
4711
      a dictionary with the structure node->(export-list)
4712
      where export-list is a list of the instances exported on
4713
      that node.
4714

4715
    """
4716
    return rpc.call_export_list(self.nodes)
4717

    
4718

    
4719
class LUExportInstance(LogicalUnit):
4720
  """Export an instance to an image in the cluster.
4721

4722
  """
4723
  HPATH = "instance-export"
4724
  HTYPE = constants.HTYPE_INSTANCE
4725
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4726
  REQ_BGL = False
4727

    
4728
  def ExpandNames(self):
4729
    self._ExpandAndLockInstance()
4730
    # FIXME: lock only instance primary and destination node
4731
    #
4732
    # Sad but true, for now we have do lock all nodes, as we don't know where
4733
    # the previous export might be, and and in this LU we search for it and
4734
    # remove it from its current node. In the future we could fix this by:
4735
    #  - making a tasklet to search (share-lock all), then create the new one,
4736
    #    then one to remove, after
4737
    #  - removing the removal operation altoghether
4738
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4739

    
4740
  def DeclareLocks(self, level):
4741
    """Last minute lock declaration."""
4742
    # All nodes are locked anyway, so nothing to do here.
4743

    
4744
  def BuildHooksEnv(self):
4745
    """Build hooks env.
4746

4747
    This will run on the master, primary node and target node.
4748

4749
    """
4750
    env = {
4751
      "EXPORT_NODE": self.op.target_node,
4752
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4753
      }
4754
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4755
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4756
          self.op.target_node]
4757
    return env, nl, nl
4758

    
4759
  def CheckPrereq(self):
4760
    """Check prerequisites.
4761

4762
    This checks that the instance and node names are valid.
4763

4764
    """
4765
    instance_name = self.op.instance_name
4766
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4767
    assert self.instance is not None, \
4768
          "Cannot retrieve locked instance %s" % self.op.instance_name
4769

    
4770
    self.dst_node = self.cfg.GetNodeInfo(
4771
      self.cfg.ExpandNodeName(self.op.target_node))
4772

    
4773
    assert self.dst_node is not None, \
4774
          "Cannot retrieve locked node %s" % self.op.target_node
4775

    
4776
    # instance disk type verification
4777
    for disk in self.instance.disks:
4778
      if disk.dev_type == constants.LD_FILE:
4779
        raise errors.OpPrereqError("Export not supported for instances with"
4780
                                   " file-based disks")
4781

    
4782
  def Exec(self, feedback_fn):
4783
    """Export an instance to an image in the cluster.
4784

4785
    """
4786
    instance = self.instance
4787
    dst_node = self.dst_node
4788
    src_node = instance.primary_node
4789
    if self.op.shutdown:
4790
      # shutdown the instance, but not the disks
4791
      if not rpc.call_instance_shutdown(src_node, instance):
4792
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4793
                                 (instance.name, src_node))
4794

    
4795
    vgname = self.cfg.GetVGName()
4796

    
4797
    snap_disks = []
4798

    
4799
    try:
4800
      for disk in instance.disks:
4801
        if disk.iv_name == "sda":
4802
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4803
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4804

    
4805
          if not new_dev_name:
4806
            logger.Error("could not snapshot block device %s on node %s" %
4807
                         (disk.logical_id[1], src_node))
4808
          else:
4809
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4810
                                      logical_id=(vgname, new_dev_name),
4811
                                      physical_id=(vgname, new_dev_name),
4812
                                      iv_name=disk.iv_name)
4813
            snap_disks.append(new_dev)
4814

    
4815
    finally:
4816
      if self.op.shutdown and instance.status == "up":
4817
        if not rpc.call_instance_start(src_node, instance, None):
4818
          _ShutdownInstanceDisks(instance, self.cfg)
4819
          raise errors.OpExecError("Could not start instance")
4820

    
4821
    # TODO: check for size
4822

    
4823
    for dev in snap_disks:
4824
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4825
        logger.Error("could not export block device %s from node %s to node %s"
4826
                     % (dev.logical_id[1], src_node, dst_node.name))
4827
      if not rpc.call_blockdev_remove(src_node, dev):
4828
        logger.Error("could not remove snapshot block device %s from node %s" %
4829
                     (dev.logical_id[1], src_node))
4830

    
4831
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4832
      logger.Error("could not finalize export for instance %s on node %s" %
4833
                   (instance.name, dst_node.name))
4834

    
4835
    nodelist = self.cfg.GetNodeList()
4836
    nodelist.remove(dst_node.name)
4837

    
4838
    # on one-node clusters nodelist will be empty after the removal
4839
    # if we proceed the backup would be removed because OpQueryExports
4840
    # substitutes an empty list with the full cluster node list.
4841
    if nodelist:
4842
      exportlist = rpc.call_export_list(nodelist)
4843
      for node in exportlist:
4844
        if instance.name in exportlist[node]:
4845
          if not rpc.call_export_remove(node, instance.name):
4846
            logger.Error("could not remove older export for instance %s"
4847
                         " on node %s" % (instance.name, node))
4848

    
4849

    
4850
class LURemoveExport(NoHooksLU):
4851
  """Remove exports related to the named instance.
4852

4853
  """
4854
  _OP_REQP = ["instance_name"]
4855
  REQ_BGL = False
4856

    
4857
  def ExpandNames(self):
4858
    self.needed_locks = {}
4859
    # We need all nodes to be locked in order for RemoveExport to work, but we
4860
    # don't need to lock the instance itself, as nothing will happen to it (and
4861
    # we can remove exports also for a removed instance)
4862
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4863

    
4864
  def CheckPrereq(self):
4865
    """Check prerequisites.
4866
    """
4867
    pass
4868

    
4869
  def Exec(self, feedback_fn):
4870
    """Remove any export.
4871

4872
    """
4873
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4874
    # If the instance was not found we'll try with the name that was passed in.
4875
    # This will only work if it was an FQDN, though.
4876
    fqdn_warn = False
4877
    if not instance_name:
4878
      fqdn_warn = True
4879
      instance_name = self.op.instance_name
4880

    
4881
    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4882
    found = False
4883
    for node in exportlist:
4884
      if instance_name in exportlist[node]:
4885
        found = True
4886
        if not rpc.call_export_remove(node, instance_name):
4887
          logger.Error("could not remove export for instance %s"
4888
                       " on node %s" % (instance_name, node))
4889

    
4890
    if fqdn_warn and not found:
4891
      feedback_fn("Export not found. If trying to remove an export belonging"
4892
                  " to a deleted instance please use its Fully Qualified"
4893
                  " Domain Name.")
4894

    
4895

    
4896
class TagsLU(NoHooksLU):
4897
  """Generic tags LU.
4898

4899
  This is an abstract class which is the parent of all the other tags LUs.
4900

4901
  """
4902

    
4903
  def ExpandNames(self):
4904
    self.needed_locks = {}
4905
    if self.op.kind == constants.TAG_NODE:
4906
      name = self.cfg.ExpandNodeName(self.op.name)
4907
      if name is None:
4908
        raise errors.OpPrereqError("Invalid node name (%s)" %
4909
                                   (self.op.name,))
4910
      self.op.name = name
4911
      self.needed_locks[locking.LEVEL_NODE] = name
4912
    elif self.op.kind == constants.TAG_INSTANCE:
4913
      name = self.cfg.ExpandInstanceName(self.op.name)
4914
      if name is None:
4915
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4916
                                   (self.op.name,))
4917
      self.op.name = name
4918
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4919

    
4920
  def CheckPrereq(self):
4921
    """Check prerequisites.
4922

4923
    """
4924
    if self.op.kind == constants.TAG_CLUSTER:
4925
      self.target = self.cfg.GetClusterInfo()
4926
    elif self.op.kind == constants.TAG_NODE:
4927
      self.target = self.cfg.GetNodeInfo(self.op.name)
4928
    elif self.op.kind == constants.TAG_INSTANCE:
4929
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4930
    else:
4931
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4932
                                 str(self.op.kind))
4933

    
4934

    
4935
class LUGetTags(TagsLU):
4936
  """Returns the tags of a given object.
4937

4938
  """
4939
  _OP_REQP = ["kind", "name"]
4940
  REQ_BGL = False
4941

    
4942
  def Exec(self, feedback_fn):
4943
    """Returns the tag list.
4944

4945
    """
4946
    return list(self.target.GetTags())
4947

    
4948

    
4949
class LUSearchTags(NoHooksLU):
4950
  """Searches the tags for a given pattern.
4951

4952
  """
4953
  _OP_REQP = ["pattern"]
4954
  REQ_BGL = False
4955

    
4956
  def ExpandNames(self):
4957
    self.needed_locks = {}
4958

    
4959
  def CheckPrereq(self):
4960
    """Check prerequisites.
4961

4962
    This checks the pattern passed for validity by compiling it.
4963

4964
    """
4965
    try:
4966
      self.re = re.compile(self.op.pattern)
4967
    except re.error, err:
4968
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4969
                                 (self.op.pattern, err))
4970

    
4971
  def Exec(self, feedback_fn):
4972
    """Returns the tag list.
4973

4974
    """
4975
    cfg = self.cfg
4976
    tgts = [("/cluster", cfg.GetClusterInfo())]
4977
    ilist = cfg.GetAllInstancesInfo().values()
4978
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4979
    nlist = cfg.GetAllNodesInfo().values()
4980
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4981
    results = []
4982
    for path, target in tgts:
4983
      for tag in target.GetTags():
4984
        if self.re.search(tag):
4985
          results.append((path, tag))
4986
    return results
4987

    
4988

    
4989
class LUAddTags(TagsLU):
4990
  """Sets a tag on a given object.
4991

4992
  """
4993
  _OP_REQP = ["kind", "name", "tags"]
4994
  REQ_BGL = False
4995

    
4996
  def CheckPrereq(self):
4997
    """Check prerequisites.
4998

4999
    This checks the type and length of the tag name and value.
5000

5001
    """
5002
    TagsLU.CheckPrereq(self)
5003
    for tag in self.op.tags:
5004
      objects.TaggableObject.ValidateTag(tag)
5005

    
5006
  def Exec(self, feedback_fn):
5007
    """Sets the tag.
5008

5009
    """
5010
    try:
5011
      for tag in self.op.tags:
5012
        self.target.AddTag(tag)
5013
    except errors.TagError, err:
5014
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5015
    try:
5016
      self.cfg.Update(self.target)
5017
    except errors.ConfigurationError:
5018
      raise errors.OpRetryError("There has been a modification to the"
5019
                                " config file and the operation has been"
5020
                                " aborted. Please retry.")
5021

    
5022

    
5023
class LUDelTags(TagsLU):
5024
  """Delete a list of tags from a given object.
5025

5026
  """
5027
  _OP_REQP = ["kind", "name", "tags"]
5028
  REQ_BGL = False
5029

    
5030
  def CheckPrereq(self):
5031
    """Check prerequisites.
5032

5033
    This checks that we have the given tag.
5034

5035
    """
5036
    TagsLU.CheckPrereq(self)
5037
    for tag in self.op.tags:
5038
      objects.TaggableObject.ValidateTag(tag)
5039
    del_tags = frozenset(self.op.tags)
5040
    cur_tags = self.target.GetTags()
5041
    if not del_tags <= cur_tags:
5042
      diff_tags = del_tags - cur_tags
5043
      diff_names = ["'%s'" % tag for tag in diff_tags]
5044
      diff_names.sort()
5045
      raise errors.OpPrereqError("Tag(s) %s not found" %
5046
                                 (",".join(diff_names)))
5047

    
5048
  def Exec(self, feedback_fn):
5049
    """Remove the tag from the object.
5050

5051
    """
5052
    for tag in self.op.tags:
5053
      self.target.RemoveTag(tag)
5054
    try:
5055
      self.cfg.Update(self.target)
5056
    except errors.ConfigurationError:
5057
      raise errors.OpRetryError("There has been a modification to the"
5058
                                " config file and the operation has been"
5059
                                " aborted. Please retry.")
5060

    
5061

    
5062
class LUTestDelay(NoHooksLU):
5063
  """Sleep for a specified amount of time.
5064

5065
  This LU sleeps on the master and/or nodes for a specified amount of
5066
  time.
5067

5068
  """
5069
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5070
  REQ_BGL = False
5071

    
5072
  def ExpandNames(self):
5073
    """Expand names and set required locks.
5074

5075
    This expands the node list, if any.
5076

5077
    """
5078
    self.needed_locks = {}
5079
    if self.op.on_nodes:
5080
      # _GetWantedNodes can be used here, but is not always appropriate to use
5081
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5082
      # more information.
5083
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5084
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5085

    
5086
  def CheckPrereq(self):
5087
    """Check prerequisites.
5088

5089
    """
5090

    
5091
  def Exec(self, feedback_fn):
5092
    """Do the actual sleep.
5093

5094
    """
5095
    if self.op.on_master:
5096
      if not utils.TestDelay(self.op.duration):
5097
        raise errors.OpExecError("Error during master delay test")
5098
    if self.op.on_nodes:
5099
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5100
      if not result:
5101
        raise errors.OpExecError("Complete failure from rpc call")
5102
      for node, node_result in result.items():
5103
        if not node_result:
5104
          raise errors.OpExecError("Failure during rpc call to node %s,"
5105
                                   " result: %s" % (node, node_result))
5106

    
5107

    
5108
class IAllocator(object):
5109
  """IAllocator framework.
5110

5111
  An IAllocator instance has three sets of attributes:
5112
    - cfg/sstore that are needed to query the cluster
5113
    - input data (all members of the _KEYS class attribute are required)
5114
    - four buffer attributes (in|out_data|text), that represent the
5115
      input (to the external script) in text and data structure format,
5116
      and the output from it, again in two formats
5117
    - the result variables from the script (success, info, nodes) for
5118
      easy usage
5119

5120
  """
5121
  _ALLO_KEYS = [
5122
    "mem_size", "disks", "disk_template",
5123
    "os", "tags", "nics", "vcpus",
5124
    ]
5125
  _RELO_KEYS = [
5126
    "relocate_from",
5127
    ]
5128

    
5129
  def __init__(self, cfg, sstore, mode, name, **kwargs):
5130
    self.cfg = cfg
5131
    self.sstore = sstore
5132
    # init buffer variables
5133
    self.in_text = self.out_text = self.in_data = self.out_data = None
5134
    # init all input fields so that pylint is happy
5135
    self.mode = mode
5136
    self.name = name
5137
    self.mem_size = self.disks = self.disk_template = None
5138
    self.os = self.tags = self.nics = self.vcpus = None
5139
    self.relocate_from = None
5140
    # computed fields
5141
    self.required_nodes = None
5142
    # init result fields
5143
    self.success = self.info = self.nodes = None
5144
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5145
      keyset = self._ALLO_KEYS
5146
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5147
      keyset = self._RELO_KEYS
5148
    else:
5149
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5150
                                   " IAllocator" % self.mode)
5151
    for key in kwargs:
5152
      if key not in keyset:
5153
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5154
                                     " IAllocator" % key)
5155
      setattr(self, key, kwargs[key])
5156
    for key in keyset:
5157
      if key not in kwargs:
5158
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5159
                                     " IAllocator" % key)
5160
    self._BuildInputData()
5161

    
5162
  def _ComputeClusterData(self):
5163
    """Compute the generic allocator input data.
5164

5165
    This is the data that is independent of the actual operation.
5166

5167
    """
5168
    cfg = self.cfg
5169
    # cluster data
5170
    data = {
5171
      "version": 1,
5172
      "cluster_name": self.sstore.GetClusterName(),
5173
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5174
      "hypervisor_type": self.sstore.GetHypervisorType(),
5175
      # we don't have job IDs
5176
      }
5177

    
5178
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5179

    
5180
    # node data
5181
    node_results = {}
5182
    node_list = cfg.GetNodeList()
5183
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5184
    for nname in node_list:
5185
      ninfo = cfg.GetNodeInfo(nname)
5186
      if nname not in node_data or not isinstance(node_data[nname], dict):
5187
        raise errors.OpExecError("Can't get data for node %s" % nname)
5188
      remote_info = node_data[nname]
5189
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5190
                   'vg_size', 'vg_free', 'cpu_total']:
5191
        if attr not in remote_info:
5192
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5193
                                   (nname, attr))
5194
        try:
5195
          remote_info[attr] = int(remote_info[attr])
5196
        except ValueError, err:
5197
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5198
                                   " %s" % (nname, attr, str(err)))
5199
      # compute memory used by primary instances
5200
      i_p_mem = i_p_up_mem = 0
5201
      for iinfo in i_list:
5202
        if iinfo.primary_node == nname:
5203
          i_p_mem += iinfo.memory
5204
          if iinfo.status == "up":
5205
            i_p_up_mem += iinfo.memory
5206

    
5207
      # compute memory used by instances
5208
      pnr = {
5209
        "tags": list(ninfo.GetTags()),
5210
        "total_memory": remote_info['memory_total'],
5211
        "reserved_memory": remote_info['memory_dom0'],
5212
        "free_memory": remote_info['memory_free'],
5213
        "i_pri_memory": i_p_mem,
5214
        "i_pri_up_memory": i_p_up_mem,
5215
        "total_disk": remote_info['vg_size'],
5216
        "free_disk": remote_info['vg_free'],
5217
        "primary_ip": ninfo.primary_ip,
5218
        "secondary_ip": ninfo.secondary_ip,
5219
        "total_cpus": remote_info['cpu_total'],
5220
        }
5221
      node_results[nname] = pnr
5222
    data["nodes"] = node_results
5223

    
5224
    # instance data
5225
    instance_data = {}
5226
    for iinfo in i_list:
5227
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5228
                  for n in iinfo.nics]
5229
      pir = {
5230
        "tags": list(iinfo.GetTags()),
5231
        "should_run": iinfo.status == "up",
5232
        "vcpus": iinfo.vcpus,
5233
        "memory": iinfo.memory,
5234
        "os": iinfo.os,
5235
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5236
        "nics": nic_data,
5237
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5238
        "disk_template": iinfo.disk_template,
5239
        }
5240
      instance_data[iinfo.name] = pir
5241

    
5242
    data["instances"] = instance_data
5243

    
5244
    self.in_data = data
5245

    
5246
  def _AddNewInstance(self):
5247
    """Add new instance data to allocator structure.
5248

5249
    This in combination with _AllocatorGetClusterData will create the
5250
    correct structure needed as input for the allocator.
5251

5252
    The checks for the completeness of the opcode must have already been
5253
    done.
5254

5255
    """
5256
    data = self.in_data
5257
    if len(self.disks) != 2:
5258
      raise errors.OpExecError("Only two-disk configurations supported")
5259

    
5260
    disk_space = _ComputeDiskSize(self.disk_template,
5261
                                  self.disks[0]["size"], self.disks[1]["size"])
5262

    
5263
    if self.disk_template in constants.DTS_NET_MIRROR:
5264
      self.required_nodes = 2
5265
    else:
5266
      self.required_nodes = 1
5267
    request = {
5268
      "type": "allocate",
5269
      "name": self.name,
5270
      "disk_template": self.disk_template,
5271
      "tags": self.tags,
5272
      "os": self.os,
5273
      "vcpus": self.vcpus,
5274
      "memory": self.mem_size,
5275
      "disks": self.disks,
5276
      "disk_space_total": disk_space,
5277
      "nics": self.nics,
5278
      "required_nodes": self.required_nodes,
5279
      }
5280
    data["request"] = request
5281

    
5282
  def _AddRelocateInstance(self):
5283
    """Add relocate instance data to allocator structure.
5284

5285
    This in combination with _IAllocatorGetClusterData will create the
5286
    correct structure needed as input for the allocator.
5287

5288
    The checks for the completeness of the opcode must have already been
5289
    done.
5290

5291
    """
5292
    instance = self.cfg.GetInstanceInfo(self.name)
5293
    if instance is None:
5294
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5295
                                   " IAllocator" % self.name)
5296

    
5297
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5298
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5299

    
5300
    if len(instance.secondary_nodes) != 1:
5301
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5302

    
5303
    self.required_nodes = 1
5304

    
5305
    disk_space = _ComputeDiskSize(instance.disk_template,
5306
                                  instance.disks[0].size,
5307
                                  instance.disks[1].size)
5308

    
5309
    request = {
5310
      "type": "relocate",
5311
      "name": self.name,
5312
      "disk_space_total": disk_space,
5313
      "required_nodes": self.required_nodes,
5314
      "relocate_from": self.relocate_from,
5315
      }
5316
    self.in_data["request"] = request
5317

    
5318
  def _BuildInputData(self):
5319
    """Build input data structures.
5320

5321
    """
5322
    self._ComputeClusterData()
5323

    
5324
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5325
      self._AddNewInstance()
5326
    else:
5327
      self._AddRelocateInstance()
5328

    
5329
    self.in_text = serializer.Dump(self.in_data)
5330

    
5331
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5332
    """Run an instance allocator and return the results.
5333

5334
    """
5335
    data = self.in_text
5336

    
5337
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5338

    
5339
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5340
      raise errors.OpExecError("Invalid result from master iallocator runner")
5341

    
5342
    rcode, stdout, stderr, fail = result
5343

    
5344
    if rcode == constants.IARUN_NOTFOUND:
5345
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5346
    elif rcode == constants.IARUN_FAILURE:
5347
      raise errors.OpExecError("Instance allocator call failed: %s,"
5348
                               " output: %s" % (fail, stdout+stderr))
5349
    self.out_text = stdout
5350
    if validate:
5351
      self._ValidateResult()
5352

    
5353
  def _ValidateResult(self):
5354
    """Process the allocator results.
5355

5356
    This will process and if successful save the result in
5357
    self.out_data and the other parameters.
5358

5359
    """
5360
    try:
5361
      rdict = serializer.Load(self.out_text)
5362
    except Exception, err:
5363
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5364

    
5365
    if not isinstance(rdict, dict):
5366
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5367

    
5368
    for key in "success", "info", "nodes":
5369
      if key not in rdict:
5370
        raise errors.OpExecError("Can't parse iallocator results:"
5371
                                 " missing key '%s'" % key)
5372
      setattr(self, key, rdict[key])
5373

    
5374
    if not isinstance(rdict["nodes"], list):
5375
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5376
                               " is not a list")
5377
    self.out_data = rdict
5378

    
5379

    
5380
class LUTestAllocator(NoHooksLU):
5381
  """Run allocator tests.
5382

5383
  This LU runs the allocator tests
5384

5385
  """
5386
  _OP_REQP = ["direction", "mode", "name"]
5387

    
5388
  def CheckPrereq(self):
5389
    """Check prerequisites.
5390

5391
    This checks the opcode parameters depending on the director and mode test.
5392

5393
    """
5394
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5395
      for attr in ["name", "mem_size", "disks", "disk_template",
5396
                   "os", "tags", "nics", "vcpus"]:
5397
        if not hasattr(self.op, attr):
5398
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5399
                                     attr)
5400
      iname = self.cfg.ExpandInstanceName(self.op.name)
5401
      if iname is not None:
5402
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5403
                                   iname)
5404
      if not isinstance(self.op.nics, list):
5405
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5406
      for row in self.op.nics:
5407
        if (not isinstance(row, dict) or
5408
            "mac" not in row or
5409
            "ip" not in row or
5410
            "bridge" not in row):
5411
          raise errors.OpPrereqError("Invalid contents of the"
5412
                                     " 'nics' parameter")
5413
      if not isinstance(self.op.disks, list):
5414
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5415
      if len(self.op.disks) != 2:
5416
        raise errors.OpPrereqError("Only two-disk configurations supported")
5417
      for row in self.op.disks:
5418
        if (not isinstance(row, dict) or
5419
            "size" not in row or
5420
            not isinstance(row["size"], int) or
5421
            "mode" not in row or
5422
            row["mode"] not in ['r', 'w']):
5423
          raise errors.OpPrereqError("Invalid contents of the"
5424
                                     " 'disks' parameter")
5425
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5426
      if not hasattr(self.op, "name"):
5427
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5428
      fname = self.cfg.ExpandInstanceName(self.op.name)
5429
      if fname is None:
5430
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5431
                                   self.op.name)
5432
      self.op.name = fname
5433
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5434
    else:
5435
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5436
                                 self.op.mode)
5437

    
5438
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5439
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5440
        raise errors.OpPrereqError("Missing allocator name")
5441
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5442
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5443
                                 self.op.direction)
5444

    
5445
  def Exec(self, feedback_fn):
5446
    """Run the allocator test.
5447

5448
    """
5449
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5450
      ial = IAllocator(self.cfg, self.sstore,
5451
                       mode=self.op.mode,
5452
                       name=self.op.name,
5453
                       mem_size=self.op.mem_size,
5454
                       disks=self.op.disks,
5455
                       disk_template=self.op.disk_template,
5456
                       os=self.op.os,
5457
                       tags=self.op.tags,
5458
                       nics=self.op.nics,
5459
                       vcpus=self.op.vcpus,
5460
                       )
5461
    else:
5462
      ial = IAllocator(self.cfg, self.sstore,
5463
                       mode=self.op.mode,
5464
                       name=self.op.name,
5465
                       relocate_from=list(self.relocate_from),
5466
                       )
5467

    
5468
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5469
      result = ial.in_text
5470
    else:
5471
      ial.Run(self.op.allocator, validate=False)
5472
      result = ial.out_text
5473
    return result