Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 38d7239a

History | View | Annotate | Download (187.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.sstore = sstore
83
    self.context = context
84
    # Dicts used to declare locking needs to mcpu
85
    self.needed_locks = None
86
    self.acquired_locks = {}
87
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88
    self.add_locks = {}
89
    self.remove_locks = {}
90
    # Used to force good behavior when calling helper functions
91
    self.recalculate_locks = {}
92
    self.__ssh = None
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = sstore.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.sstore)
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  Args:
377
    static: Static fields
378
    dynamic: Dynamic fields
379

380
  """
381
  static_fields = frozenset(static)
382
  dynamic_fields = frozenset(dynamic)
383

    
384
  all_fields = static_fields | dynamic_fields
385

    
386
  if not all_fields.issuperset(selected):
387
    raise errors.OpPrereqError("Unknown output fields selected: %s"
388
                               % ",".join(frozenset(selected).
389
                                          difference(all_fields)))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  args = {
434
    'name': instance.name,
435
    'primary_node': instance.primary_node,
436
    'secondary_nodes': instance.secondary_nodes,
437
    'os_type': instance.os,
438
    'status': instance.os,
439
    'memory': instance.memory,
440
    'vcpus': instance.vcpus,
441
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442
  }
443
  if override:
444
    args.update(override)
445
  return _BuildInstanceHookEnv(**args)
446

    
447

    
448
def _CheckInstanceBridgesExist(instance):
449
  """Check that the brigdes needed by an instance exist.
450

451
  """
452
  # check bridges existance
453
  brlist = [nic.bridge for nic in instance.nics]
454
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
455
    raise errors.OpPrereqError("one or more target bridges %s does not"
456
                               " exist on destination node '%s'" %
457
                               (brlist, instance.primary_node))
458

    
459

    
460
class LUDestroyCluster(NoHooksLU):
461
  """Logical unit for destroying the cluster.
462

463
  """
464
  _OP_REQP = []
465

    
466
  def CheckPrereq(self):
467
    """Check prerequisites.
468

469
    This checks whether the cluster is empty.
470

471
    Any errors are signalled by raising errors.OpPrereqError.
472

473
    """
474
    master = self.sstore.GetMasterNode()
475

    
476
    nodelist = self.cfg.GetNodeList()
477
    if len(nodelist) != 1 or nodelist[0] != master:
478
      raise errors.OpPrereqError("There are still %d node(s) in"
479
                                 " this cluster." % (len(nodelist) - 1))
480
    instancelist = self.cfg.GetInstanceList()
481
    if instancelist:
482
      raise errors.OpPrereqError("There are still %d instance(s) in"
483
                                 " this cluster." % len(instancelist))
484

    
485
  def Exec(self, feedback_fn):
486
    """Destroys the cluster.
487

488
    """
489
    master = self.sstore.GetMasterNode()
490
    if not rpc.call_node_stop_master(master, False):
491
      raise errors.OpExecError("Could not disable the master role")
492
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493
    utils.CreateBackup(priv_key)
494
    utils.CreateBackup(pub_key)
495
    return master
496

    
497

    
498
class LUVerifyCluster(LogicalUnit):
499
  """Verifies the cluster status.
500

501
  """
502
  HPATH = "cluster-verify"
503
  HTYPE = constants.HTYPE_CLUSTER
504
  _OP_REQP = ["skip_checks"]
505
  REQ_BGL = False
506

    
507
  def ExpandNames(self):
508
    self.needed_locks = {
509
      locking.LEVEL_NODE: locking.ALL_SET,
510
      locking.LEVEL_INSTANCE: locking.ALL_SET,
511
    }
512
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513

    
514
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515
                  remote_version, feedback_fn):
516
    """Run multiple tests against a node.
517

518
    Test list:
519
      - compares ganeti version
520
      - checks vg existance and size > 20G
521
      - checks config file checksum
522
      - checks ssh to other nodes
523

524
    Args:
525
      node: name of the node to check
526
      file_list: required list of files
527
      local_cksum: dictionary of local files and their checksums
528

529
    """
530
    # compares ganeti version
531
    local_version = constants.PROTOCOL_VERSION
532
    if not remote_version:
533
      feedback_fn("  - ERROR: connection to %s failed" % (node))
534
      return True
535

    
536
    if local_version != remote_version:
537
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538
                      (local_version, node, remote_version))
539
      return True
540

    
541
    # checks vg existance and size > 20G
542

    
543
    bad = False
544
    if not vglist:
545
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546
                      (node,))
547
      bad = True
548
    else:
549
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550
                                            constants.MIN_VG_SIZE)
551
      if vgstatus:
552
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553
        bad = True
554

    
555
    # checks config file checksum
556
    # checks ssh to any
557

    
558
    if 'filelist' not in node_result:
559
      bad = True
560
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
561
    else:
562
      remote_cksum = node_result['filelist']
563
      for file_name in file_list:
564
        if file_name not in remote_cksum:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
567
        elif remote_cksum[file_name] != local_cksum[file_name]:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570

    
571
    if 'nodelist' not in node_result:
572
      bad = True
573
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574
    else:
575
      if node_result['nodelist']:
576
        bad = True
577
        for node in node_result['nodelist']:
578
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579
                          (node, node_result['nodelist'][node]))
580
    if 'node-net-test' not in node_result:
581
      bad = True
582
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583
    else:
584
      if node_result['node-net-test']:
585
        bad = True
586
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
587
        for node in nlist:
588
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589
                          (node, node_result['node-net-test'][node]))
590

    
591
    hyp_result = node_result.get('hypervisor', None)
592
    if hyp_result is not None:
593
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594
    return bad
595

    
596
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597
                      node_instance, feedback_fn):
598
    """Verify an instance.
599

600
    This function checks to see if the required block devices are
601
    available on the instance's node.
602

603
    """
604
    bad = False
605

    
606
    node_current = instanceconfig.primary_node
607

    
608
    node_vol_should = {}
609
    instanceconfig.MapLVsByNode(node_vol_should)
610

    
611
    for node in node_vol_should:
612
      for volume in node_vol_should[node]:
613
        if node not in node_vol_is or volume not in node_vol_is[node]:
614
          feedback_fn("  - ERROR: volume %s missing on node %s" %
615
                          (volume, node))
616
          bad = True
617

    
618
    if not instanceconfig.status == 'down':
619
      if (node_current not in node_instance or
620
          not instance in node_instance[node_current]):
621
        feedback_fn("  - ERROR: instance %s not running on node %s" %
622
                        (instance, node_current))
623
        bad = True
624

    
625
    for node in node_instance:
626
      if (not node == node_current):
627
        if instance in node_instance[node]:
628
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
629
                          (instance, node))
630
          bad = True
631

    
632
    return bad
633

    
634
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635
    """Verify if there are any unknown volumes in the cluster.
636

637
    The .os, .swap and backup volumes are ignored. All other volumes are
638
    reported as unknown.
639

640
    """
641
    bad = False
642

    
643
    for node in node_vol_is:
644
      for volume in node_vol_is[node]:
645
        if node not in node_vol_should or volume not in node_vol_should[node]:
646
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647
                      (volume, node))
648
          bad = True
649
    return bad
650

    
651
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652
    """Verify the list of running instances.
653

654
    This checks what instances are running but unknown to the cluster.
655

656
    """
657
    bad = False
658
    for node in node_instance:
659
      for runninginstance in node_instance[node]:
660
        if runninginstance not in instancelist:
661
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662
                          (runninginstance, node))
663
          bad = True
664
    return bad
665

    
666
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667
    """Verify N+1 Memory Resilience.
668

669
    Check that if one single node dies we can still start all the instances it
670
    was primary for.
671

672
    """
673
    bad = False
674

    
675
    for node, nodeinfo in node_info.iteritems():
676
      # This code checks that every node which is now listed as secondary has
677
      # enough memory to host all instances it is supposed to should a single
678
      # other node in the cluster fail.
679
      # FIXME: not ready for failover to an arbitrary node
680
      # FIXME: does not support file-backed instances
681
      # WARNING: we currently take into account down instances as well as up
682
      # ones, considering that even if they're down someone might want to start
683
      # them even in the event of a node failure.
684
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685
        needed_mem = 0
686
        for instance in instances:
687
          needed_mem += instance_cfg[instance].memory
688
        if nodeinfo['mfree'] < needed_mem:
689
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690
                      " failovers should node %s fail" % (node, prinode))
691
          bad = True
692
    return bad
693

    
694
  def CheckPrereq(self):
695
    """Check prerequisites.
696

697
    Transform the list of checks we're going to skip into a set and check that
698
    all its members are valid.
699

700
    """
701
    self.skip_set = frozenset(self.op.skip_checks)
702
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    Cluster-Verify hooks just rone in the post phase and their failure makes
709
    the output be logged in the verify output and the verification to fail.
710

711
    """
712
    all_nodes = self.cfg.GetNodeList()
713
    # TODO: populate the environment with useful information for verify hooks
714
    env = {}
715
    return env, [], all_nodes
716

    
717
  def Exec(self, feedback_fn):
718
    """Verify integrity of cluster, performing various test on nodes.
719

720
    """
721
    bad = False
722
    feedback_fn("* Verifying global settings")
723
    for msg in self.cfg.VerifyConfig():
724
      feedback_fn("  - ERROR: %s" % msg)
725

    
726
    vg_name = self.cfg.GetVGName()
727
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
728
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730
    i_non_redundant = [] # Non redundant instances
731
    node_volume = {}
732
    node_instance = {}
733
    node_info = {}
734
    instance_cfg = {}
735

    
736
    # FIXME: verify OS list
737
    # do local checksums
738
    file_names = list(self.sstore.GetFileList())
739
    file_names.append(constants.SSL_CERT_FILE)
740
    file_names.append(constants.CLUSTER_CONF_FILE)
741
    local_checksums = utils.FingerprintFiles(file_names)
742

    
743
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745
    all_instanceinfo = rpc.call_instance_list(nodelist)
746
    all_vglist = rpc.call_vg_list(nodelist)
747
    node_verify_param = {
748
      'filelist': file_names,
749
      'nodelist': nodelist,
750
      'hypervisor': None,
751
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752
                        for node in nodeinfo]
753
      }
754
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755
    all_rversion = rpc.call_version(nodelist)
756
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757

    
758
    for node in nodelist:
759
      feedback_fn("* Verifying node %s" % node)
760
      result = self._VerifyNode(node, file_names, local_checksums,
761
                                all_vglist[node], all_nvinfo[node],
762
                                all_rversion[node], feedback_fn)
763
      bad = bad or result
764

    
765
      # node_volume
766
      volumeinfo = all_volumeinfo[node]
767

    
768
      if isinstance(volumeinfo, basestring):
769
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770
                    (node, volumeinfo[-400:].encode('string_escape')))
771
        bad = True
772
        node_volume[node] = {}
773
      elif not isinstance(volumeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777
      else:
778
        node_volume[node] = volumeinfo
779

    
780
      # node_instance
781
      nodeinstance = all_instanceinfo[node]
782
      if type(nodeinstance) != list:
783
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
784
        bad = True
785
        continue
786

    
787
      node_instance[node] = nodeinstance
788

    
789
      # node_info
790
      nodeinfo = all_ninfo[node]
791
      if not isinstance(nodeinfo, dict):
792
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
793
        bad = True
794
        continue
795

    
796
      try:
797
        node_info[node] = {
798
          "mfree": int(nodeinfo['memory_free']),
799
          "dfree": int(nodeinfo['vg_free']),
800
          "pinst": [],
801
          "sinst": [],
802
          # dictionary holding all instances this node is secondary for,
803
          # grouped by their primary node. Each key is a cluster node, and each
804
          # value is a list of instances which have the key as primary and the
805
          # current node as secondary.  this is handy to calculate N+1 memory
806
          # availability if you can only failover from a primary to its
807
          # secondary.
808
          "sinst-by-pnode": {},
809
        }
810
      except ValueError:
811
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812
        bad = True
813
        continue
814

    
815
    node_vol_should = {}
816

    
817
    for instance in instancelist:
818
      feedback_fn("* Verifying instance %s" % instance)
819
      inst_config = self.cfg.GetInstanceInfo(instance)
820
      result =  self._VerifyInstance(instance, inst_config, node_volume,
821
                                     node_instance, feedback_fn)
822
      bad = bad or result
823

    
824
      inst_config.MapLVsByNode(node_vol_should)
825

    
826
      instance_cfg[instance] = inst_config
827

    
828
      pnode = inst_config.primary_node
829
      if pnode in node_info:
830
        node_info[pnode]['pinst'].append(instance)
831
      else:
832
        feedback_fn("  - ERROR: instance %s, connection to primary node"
833
                    " %s failed" % (instance, pnode))
834
        bad = True
835

    
836
      # If the instance is non-redundant we cannot survive losing its primary
837
      # node, so we are not N+1 compliant. On the other hand we have no disk
838
      # templates with more than one secondary so that situation is not well
839
      # supported either.
840
      # FIXME: does not support file-backed instances
841
      if len(inst_config.secondary_nodes) == 0:
842
        i_non_redundant.append(instance)
843
      elif len(inst_config.secondary_nodes) > 1:
844
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
845
                    % instance)
846

    
847
      for snode in inst_config.secondary_nodes:
848
        if snode in node_info:
849
          node_info[snode]['sinst'].append(instance)
850
          if pnode not in node_info[snode]['sinst-by-pnode']:
851
            node_info[snode]['sinst-by-pnode'][pnode] = []
852
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853
        else:
854
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
855
                      " %s failed" % (instance, snode))
856

    
857
    feedback_fn("* Verifying orphan volumes")
858
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859
                                       feedback_fn)
860
    bad = bad or result
861

    
862
    feedback_fn("* Verifying remaining instances")
863
    result = self._VerifyOrphanInstances(instancelist, node_instance,
864
                                         feedback_fn)
865
    bad = bad or result
866

    
867
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868
      feedback_fn("* Verifying N+1 Memory redundancy")
869
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870
      bad = bad or result
871

    
872
    feedback_fn("* Other Notes")
873
    if i_non_redundant:
874
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875
                  % len(i_non_redundant))
876

    
877
    return not bad
878

    
879
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880
    """Analize the post-hooks' result, handle it, and send some
881
    nicely-formatted feedback back to the user.
882

883
    Args:
884
      phase: the hooks phase that has just been run
885
      hooks_results: the results of the multi-node hooks rpc call
886
      feedback_fn: function to send feedback back to the caller
887
      lu_result: previous Exec result
888

889
    """
890
    # We only really run POST phase hooks, and are only interested in
891
    # their results
892
    if phase == constants.HOOKS_PHASE_POST:
893
      # Used to change hooks' output to proper indentation
894
      indent_re = re.compile('^', re.M)
895
      feedback_fn("* Hooks Results")
896
      if not hooks_results:
897
        feedback_fn("  - ERROR: general communication failure")
898
        lu_result = 1
899
      else:
900
        for node_name in hooks_results:
901
          show_node_header = True
902
          res = hooks_results[node_name]
903
          if res is False or not isinstance(res, list):
904
            feedback_fn("    Communication failure")
905
            lu_result = 1
906
            continue
907
          for script, hkr, output in res:
908
            if hkr == constants.HKR_FAIL:
909
              # The node header is only shown once, if there are
910
              # failing hooks on that node
911
              if show_node_header:
912
                feedback_fn("  Node %s:" % node_name)
913
                show_node_header = False
914
              feedback_fn("    ERROR: Script %s failed, output:" % script)
915
              output = indent_re.sub('      ', output)
916
              feedback_fn("%s" % output)
917
              lu_result = 1
918

    
919
      return lu_result
920

    
921

    
922
class LUVerifyDisks(NoHooksLU):
923
  """Verifies the cluster disks status.
924

925
  """
926
  _OP_REQP = []
927
  REQ_BGL = False
928

    
929
  def ExpandNames(self):
930
    self.needed_locks = {
931
      locking.LEVEL_NODE: locking.ALL_SET,
932
      locking.LEVEL_INSTANCE: locking.ALL_SET,
933
    }
934
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935

    
936
  def CheckPrereq(self):
937
    """Check prerequisites.
938

939
    This has no prerequisites.
940

941
    """
942
    pass
943

    
944
  def Exec(self, feedback_fn):
945
    """Verify integrity of cluster disks.
946

947
    """
948
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949

    
950
    vg_name = self.cfg.GetVGName()
951
    nodes = utils.NiceSort(self.cfg.GetNodeList())
952
    instances = [self.cfg.GetInstanceInfo(name)
953
                 for name in self.cfg.GetInstanceList()]
954

    
955
    nv_dict = {}
956
    for inst in instances:
957
      inst_lvs = {}
958
      if (inst.status != "up" or
959
          inst.disk_template not in constants.DTS_NET_MIRROR):
960
        continue
961
      inst.MapLVsByNode(inst_lvs)
962
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963
      for node, vol_list in inst_lvs.iteritems():
964
        for vol in vol_list:
965
          nv_dict[(node, vol)] = inst
966

    
967
    if not nv_dict:
968
      return result
969

    
970
    node_lvs = rpc.call_volume_list(nodes, vg_name)
971

    
972
    to_act = set()
973
    for node in nodes:
974
      # node_volume
975
      lvs = node_lvs[node]
976

    
977
      if isinstance(lvs, basestring):
978
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979
        res_nlvm[node] = lvs
980
      elif not isinstance(lvs, dict):
981
        logger.Info("connection to node %s failed or invalid data returned" %
982
                    (node,))
983
        res_nodes.append(node)
984
        continue
985

    
986
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987
        inst = nv_dict.pop((node, lv_name), None)
988
        if (not lv_online and inst is not None
989
            and inst.name not in res_instances):
990
          res_instances.append(inst.name)
991

    
992
    # any leftover items in nv_dict are missing LVs, let's arrange the
993
    # data better
994
    for key, inst in nv_dict.iteritems():
995
      if inst.name not in res_missing:
996
        res_missing[inst.name] = []
997
      res_missing[inst.name].append(key)
998

    
999
    return result
1000

    
1001

    
1002
class LURenameCluster(LogicalUnit):
1003
  """Rename the cluster.
1004

1005
  """
1006
  HPATH = "cluster-rename"
1007
  HTYPE = constants.HTYPE_CLUSTER
1008
  _OP_REQP = ["name"]
1009
  REQ_WSSTORE = True
1010

    
1011
  def BuildHooksEnv(self):
1012
    """Build hooks env.
1013

1014
    """
1015
    env = {
1016
      "OP_TARGET": self.sstore.GetClusterName(),
1017
      "NEW_NAME": self.op.name,
1018
      }
1019
    mn = self.sstore.GetMasterNode()
1020
    return env, [mn], [mn]
1021

    
1022
  def CheckPrereq(self):
1023
    """Verify that the passed name is a valid one.
1024

1025
    """
1026
    hostname = utils.HostInfo(self.op.name)
1027

    
1028
    new_name = hostname.name
1029
    self.ip = new_ip = hostname.ip
1030
    old_name = self.sstore.GetClusterName()
1031
    old_ip = self.sstore.GetMasterIP()
1032
    if new_name == old_name and new_ip == old_ip:
1033
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1034
                                 " cluster has changed")
1035
    if new_ip != old_ip:
1036
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1037
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1038
                                   " reachable on the network. Aborting." %
1039
                                   new_ip)
1040

    
1041
    self.op.name = new_name
1042

    
1043
  def Exec(self, feedback_fn):
1044
    """Rename the cluster.
1045

1046
    """
1047
    clustername = self.op.name
1048
    ip = self.ip
1049
    ss = self.sstore
1050

    
1051
    # shutdown the master IP
1052
    master = ss.GetMasterNode()
1053
    if not rpc.call_node_stop_master(master, False):
1054
      raise errors.OpExecError("Could not disable the master role")
1055

    
1056
    try:
1057
      # modify the sstore
1058
      ss.SetKey(ss.SS_MASTER_IP, ip)
1059
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1060

    
1061
      # Distribute updated ss config to all nodes
1062
      myself = self.cfg.GetNodeInfo(master)
1063
      dist_nodes = self.cfg.GetNodeList()
1064
      if myself.name in dist_nodes:
1065
        dist_nodes.remove(myself.name)
1066

    
1067
      logger.Debug("Copying updated ssconf data to all nodes")
1068
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1069
        fname = ss.KeyToFilename(keyname)
1070
        result = rpc.call_upload_file(dist_nodes, fname)
1071
        for to_node in dist_nodes:
1072
          if not result[to_node]:
1073
            logger.Error("copy of file %s to node %s failed" %
1074
                         (fname, to_node))
1075
    finally:
1076
      if not rpc.call_node_start_master(master, False):
1077
        logger.Error("Could not re-enable the master role on the master,"
1078
                     " please restart manually.")
1079

    
1080

    
1081
def _RecursiveCheckIfLVMBased(disk):
1082
  """Check if the given disk or its children are lvm-based.
1083

1084
  Args:
1085
    disk: ganeti.objects.Disk object
1086

1087
  Returns:
1088
    boolean indicating whether a LD_LV dev_type was found or not
1089

1090
  """
1091
  if disk.children:
1092
    for chdisk in disk.children:
1093
      if _RecursiveCheckIfLVMBased(chdisk):
1094
        return True
1095
  return disk.dev_type == constants.LD_LV
1096

    
1097

    
1098
class LUSetClusterParams(LogicalUnit):
1099
  """Change the parameters of the cluster.
1100

1101
  """
1102
  HPATH = "cluster-modify"
1103
  HTYPE = constants.HTYPE_CLUSTER
1104
  _OP_REQP = []
1105
  REQ_BGL = False
1106

    
1107
  def ExpandNames(self):
1108
    # FIXME: in the future maybe other cluster params won't require checking on
1109
    # all nodes to be modified.
1110
    self.needed_locks = {
1111
      locking.LEVEL_NODE: locking.ALL_SET,
1112
    }
1113
    self.share_locks[locking.LEVEL_NODE] = 1
1114

    
1115
  def BuildHooksEnv(self):
1116
    """Build hooks env.
1117

1118
    """
1119
    env = {
1120
      "OP_TARGET": self.sstore.GetClusterName(),
1121
      "NEW_VG_NAME": self.op.vg_name,
1122
      }
1123
    mn = self.sstore.GetMasterNode()
1124
    return env, [mn], [mn]
1125

    
1126
  def CheckPrereq(self):
1127
    """Check prerequisites.
1128

1129
    This checks whether the given params don't conflict and
1130
    if the given volume group is valid.
1131

1132
    """
1133
    # FIXME: This only works because there is only one parameter that can be
1134
    # changed or removed.
1135
    if not self.op.vg_name:
1136
      instances = self.cfg.GetAllInstancesInfo().values()
1137
      for inst in instances:
1138
        for disk in inst.disks:
1139
          if _RecursiveCheckIfLVMBased(disk):
1140
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1141
                                       " lvm-based instances exist")
1142

    
1143
    # if vg_name not None, checks given volume group on all nodes
1144
    if self.op.vg_name:
1145
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1146
      vglist = rpc.call_vg_list(node_list)
1147
      for node in node_list:
1148
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1149
                                              constants.MIN_VG_SIZE)
1150
        if vgstatus:
1151
          raise errors.OpPrereqError("Error on node '%s': %s" %
1152
                                     (node, vgstatus))
1153

    
1154
  def Exec(self, feedback_fn):
1155
    """Change the parameters of the cluster.
1156

1157
    """
1158
    if self.op.vg_name != self.cfg.GetVGName():
1159
      self.cfg.SetVGName(self.op.vg_name)
1160
    else:
1161
      feedback_fn("Cluster LVM configuration already in desired"
1162
                  " state, not changing")
1163

    
1164

    
1165
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1166
  """Sleep and poll for an instance's disk to sync.
1167

1168
  """
1169
  if not instance.disks:
1170
    return True
1171

    
1172
  if not oneshot:
1173
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1174

    
1175
  node = instance.primary_node
1176

    
1177
  for dev in instance.disks:
1178
    cfgw.SetDiskID(dev, node)
1179

    
1180
  retries = 0
1181
  while True:
1182
    max_time = 0
1183
    done = True
1184
    cumul_degraded = False
1185
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1186
    if not rstats:
1187
      proc.LogWarning("Can't get any data from node %s" % node)
1188
      retries += 1
1189
      if retries >= 10:
1190
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1191
                                 " aborting." % node)
1192
      time.sleep(6)
1193
      continue
1194
    retries = 0
1195
    for i in range(len(rstats)):
1196
      mstat = rstats[i]
1197
      if mstat is None:
1198
        proc.LogWarning("Can't compute data for node %s/%s" %
1199
                        (node, instance.disks[i].iv_name))
1200
        continue
1201
      # we ignore the ldisk parameter
1202
      perc_done, est_time, is_degraded, _ = mstat
1203
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1204
      if perc_done is not None:
1205
        done = False
1206
        if est_time is not None:
1207
          rem_time = "%d estimated seconds remaining" % est_time
1208
          max_time = est_time
1209
        else:
1210
          rem_time = "no time estimate"
1211
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1212
                     (instance.disks[i].iv_name, perc_done, rem_time))
1213
    if done or oneshot:
1214
      break
1215

    
1216
    time.sleep(min(60, max_time))
1217

    
1218
  if done:
1219
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1220
  return not cumul_degraded
1221

    
1222

    
1223
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1224
  """Check that mirrors are not degraded.
1225

1226
  The ldisk parameter, if True, will change the test from the
1227
  is_degraded attribute (which represents overall non-ok status for
1228
  the device(s)) to the ldisk (representing the local storage status).
1229

1230
  """
1231
  cfgw.SetDiskID(dev, node)
1232
  if ldisk:
1233
    idx = 6
1234
  else:
1235
    idx = 5
1236

    
1237
  result = True
1238
  if on_primary or dev.AssembleOnSecondary():
1239
    rstats = rpc.call_blockdev_find(node, dev)
1240
    if not rstats:
1241
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1242
      result = False
1243
    else:
1244
      result = result and (not rstats[idx])
1245
  if dev.children:
1246
    for child in dev.children:
1247
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1248

    
1249
  return result
1250

    
1251

    
1252
class LUDiagnoseOS(NoHooksLU):
1253
  """Logical unit for OS diagnose/query.
1254

1255
  """
1256
  _OP_REQP = ["output_fields", "names"]
1257
  REQ_BGL = False
1258

    
1259
  def ExpandNames(self):
1260
    if self.op.names:
1261
      raise errors.OpPrereqError("Selective OS query not supported")
1262

    
1263
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1264
    _CheckOutputFields(static=[],
1265
                       dynamic=self.dynamic_fields,
1266
                       selected=self.op.output_fields)
1267

    
1268
    # Lock all nodes, in shared mode
1269
    self.needed_locks = {}
1270
    self.share_locks[locking.LEVEL_NODE] = 1
1271
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1272

    
1273
  def CheckPrereq(self):
1274
    """Check prerequisites.
1275

1276
    """
1277

    
1278
  @staticmethod
1279
  def _DiagnoseByOS(node_list, rlist):
1280
    """Remaps a per-node return list into an a per-os per-node dictionary
1281

1282
      Args:
1283
        node_list: a list with the names of all nodes
1284
        rlist: a map with node names as keys and OS objects as values
1285

1286
      Returns:
1287
        map: a map with osnames as keys and as value another map, with
1288
             nodes as
1289
             keys and list of OS objects as values
1290
             e.g. {"debian-etch": {"node1": [<object>,...],
1291
                                   "node2": [<object>,]}
1292
                  }
1293

1294
    """
1295
    all_os = {}
1296
    for node_name, nr in rlist.iteritems():
1297
      if not nr:
1298
        continue
1299
      for os_obj in nr:
1300
        if os_obj.name not in all_os:
1301
          # build a list of nodes for this os containing empty lists
1302
          # for each node in node_list
1303
          all_os[os_obj.name] = {}
1304
          for nname in node_list:
1305
            all_os[os_obj.name][nname] = []
1306
        all_os[os_obj.name][node_name].append(os_obj)
1307
    return all_os
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Compute the list of OSes.
1311

1312
    """
1313
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1314
    node_data = rpc.call_os_diagnose(node_list)
1315
    if node_data == False:
1316
      raise errors.OpExecError("Can't gather the list of OSes")
1317
    pol = self._DiagnoseByOS(node_list, node_data)
1318
    output = []
1319
    for os_name, os_data in pol.iteritems():
1320
      row = []
1321
      for field in self.op.output_fields:
1322
        if field == "name":
1323
          val = os_name
1324
        elif field == "valid":
1325
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1326
        elif field == "node_status":
1327
          val = {}
1328
          for node_name, nos_list in os_data.iteritems():
1329
            val[node_name] = [(v.status, v.path) for v in nos_list]
1330
        else:
1331
          raise errors.ParameterError(field)
1332
        row.append(val)
1333
      output.append(row)
1334

    
1335
    return output
1336

    
1337

    
1338
class LURemoveNode(LogicalUnit):
1339
  """Logical unit for removing a node.
1340

1341
  """
1342
  HPATH = "node-remove"
1343
  HTYPE = constants.HTYPE_NODE
1344
  _OP_REQP = ["node_name"]
1345

    
1346
  def BuildHooksEnv(self):
1347
    """Build hooks env.
1348

1349
    This doesn't run on the target node in the pre phase as a failed
1350
    node would then be impossible to remove.
1351

1352
    """
1353
    env = {
1354
      "OP_TARGET": self.op.node_name,
1355
      "NODE_NAME": self.op.node_name,
1356
      }
1357
    all_nodes = self.cfg.GetNodeList()
1358
    all_nodes.remove(self.op.node_name)
1359
    return env, all_nodes, all_nodes
1360

    
1361
  def CheckPrereq(self):
1362
    """Check prerequisites.
1363

1364
    This checks:
1365
     - the node exists in the configuration
1366
     - it does not have primary or secondary instances
1367
     - it's not the master
1368

1369
    Any errors are signalled by raising errors.OpPrereqError.
1370

1371
    """
1372
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1373
    if node is None:
1374
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1375

    
1376
    instance_list = self.cfg.GetInstanceList()
1377

    
1378
    masternode = self.sstore.GetMasterNode()
1379
    if node.name == masternode:
1380
      raise errors.OpPrereqError("Node is the master node,"
1381
                                 " you need to failover first.")
1382

    
1383
    for instance_name in instance_list:
1384
      instance = self.cfg.GetInstanceInfo(instance_name)
1385
      if node.name == instance.primary_node:
1386
        raise errors.OpPrereqError("Instance %s still running on the node,"
1387
                                   " please remove first." % instance_name)
1388
      if node.name in instance.secondary_nodes:
1389
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1390
                                   " please remove first." % instance_name)
1391
    self.op.node_name = node.name
1392
    self.node = node
1393

    
1394
  def Exec(self, feedback_fn):
1395
    """Removes the node from the cluster.
1396

1397
    """
1398
    node = self.node
1399
    logger.Info("stopping the node daemon and removing configs from node %s" %
1400
                node.name)
1401

    
1402
    self.context.RemoveNode(node.name)
1403

    
1404
    rpc.call_node_leave_cluster(node.name)
1405

    
1406

    
1407
class LUQueryNodes(NoHooksLU):
1408
  """Logical unit for querying nodes.
1409

1410
  """
1411
  _OP_REQP = ["output_fields", "names"]
1412
  REQ_BGL = False
1413

    
1414
  def ExpandNames(self):
1415
    self.dynamic_fields = frozenset([
1416
      "dtotal", "dfree",
1417
      "mtotal", "mnode", "mfree",
1418
      "bootid",
1419
      "ctotal",
1420
      ])
1421

    
1422
    self.static_fields = frozenset([
1423
      "name", "pinst_cnt", "sinst_cnt",
1424
      "pinst_list", "sinst_list",
1425
      "pip", "sip", "tags",
1426
      "serial_no",
1427
      ])
1428

    
1429
    _CheckOutputFields(static=self.static_fields,
1430
                       dynamic=self.dynamic_fields,
1431
                       selected=self.op.output_fields)
1432

    
1433
    self.needed_locks = {}
1434
    self.share_locks[locking.LEVEL_NODE] = 1
1435

    
1436
    if self.op.names:
1437
      self.wanted = _GetWantedNodes(self, self.op.names)
1438
    else:
1439
      self.wanted = locking.ALL_SET
1440

    
1441
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1442
    if self.do_locking:
1443
      # if we don't request only static fields, we need to lock the nodes
1444
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1445

    
1446

    
1447
  def CheckPrereq(self):
1448
    """Check prerequisites.
1449

1450
    """
1451
    # The validation of the node list is done in the _GetWantedNodes,
1452
    # if non empty, and if empty, there's no validation to do
1453
    pass
1454

    
1455
  def Exec(self, feedback_fn):
1456
    """Computes the list of nodes and their attributes.
1457

1458
    """
1459
    all_info = self.cfg.GetAllNodesInfo()
1460
    if self.do_locking:
1461
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1462
    elif self.wanted != locking.ALL_SET:
1463
      nodenames = self.wanted
1464
      missing = set(nodenames).difference(all_info.keys())
1465
      if missing:
1466
        raise self.OpExecError(
1467
          "Some nodes were removed before retrieving their data: %s" % missing)
1468
    else:
1469
      nodenames = all_info.keys()
1470
    nodelist = [all_info[name] for name in nodenames]
1471

    
1472
    # begin data gathering
1473

    
1474
    if self.dynamic_fields.intersection(self.op.output_fields):
1475
      live_data = {}
1476
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1477
      for name in nodenames:
1478
        nodeinfo = node_data.get(name, None)
1479
        if nodeinfo:
1480
          live_data[name] = {
1481
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1482
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1483
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1484
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1485
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1486
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1487
            "bootid": nodeinfo['bootid'],
1488
            }
1489
        else:
1490
          live_data[name] = {}
1491
    else:
1492
      live_data = dict.fromkeys(nodenames, {})
1493

    
1494
    node_to_primary = dict([(name, set()) for name in nodenames])
1495
    node_to_secondary = dict([(name, set()) for name in nodenames])
1496

    
1497
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1498
                             "sinst_cnt", "sinst_list"))
1499
    if inst_fields & frozenset(self.op.output_fields):
1500
      instancelist = self.cfg.GetInstanceList()
1501

    
1502
      for instance_name in instancelist:
1503
        inst = self.cfg.GetInstanceInfo(instance_name)
1504
        if inst.primary_node in node_to_primary:
1505
          node_to_primary[inst.primary_node].add(inst.name)
1506
        for secnode in inst.secondary_nodes:
1507
          if secnode in node_to_secondary:
1508
            node_to_secondary[secnode].add(inst.name)
1509

    
1510
    # end data gathering
1511

    
1512
    output = []
1513
    for node in nodelist:
1514
      node_output = []
1515
      for field in self.op.output_fields:
1516
        if field == "name":
1517
          val = node.name
1518
        elif field == "pinst_list":
1519
          val = list(node_to_primary[node.name])
1520
        elif field == "sinst_list":
1521
          val = list(node_to_secondary[node.name])
1522
        elif field == "pinst_cnt":
1523
          val = len(node_to_primary[node.name])
1524
        elif field == "sinst_cnt":
1525
          val = len(node_to_secondary[node.name])
1526
        elif field == "pip":
1527
          val = node.primary_ip
1528
        elif field == "sip":
1529
          val = node.secondary_ip
1530
        elif field == "tags":
1531
          val = list(node.GetTags())
1532
        elif field == "serial_no":
1533
          val = node.serial_no
1534
        elif field in self.dynamic_fields:
1535
          val = live_data[node.name].get(field, None)
1536
        else:
1537
          raise errors.ParameterError(field)
1538
        node_output.append(val)
1539
      output.append(node_output)
1540

    
1541
    return output
1542

    
1543

    
1544
class LUQueryNodeVolumes(NoHooksLU):
1545
  """Logical unit for getting volumes on node(s).
1546

1547
  """
1548
  _OP_REQP = ["nodes", "output_fields"]
1549
  REQ_BGL = False
1550

    
1551
  def ExpandNames(self):
1552
    _CheckOutputFields(static=["node"],
1553
                       dynamic=["phys", "vg", "name", "size", "instance"],
1554
                       selected=self.op.output_fields)
1555

    
1556
    self.needed_locks = {}
1557
    self.share_locks[locking.LEVEL_NODE] = 1
1558
    if not self.op.nodes:
1559
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1560
    else:
1561
      self.needed_locks[locking.LEVEL_NODE] = \
1562
        _GetWantedNodes(self, self.op.nodes)
1563

    
1564
  def CheckPrereq(self):
1565
    """Check prerequisites.
1566

1567
    This checks that the fields required are valid output fields.
1568

1569
    """
1570
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1571

    
1572
  def Exec(self, feedback_fn):
1573
    """Computes the list of nodes and their attributes.
1574

1575
    """
1576
    nodenames = self.nodes
1577
    volumes = rpc.call_node_volumes(nodenames)
1578

    
1579
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1580
             in self.cfg.GetInstanceList()]
1581

    
1582
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1583

    
1584
    output = []
1585
    for node in nodenames:
1586
      if node not in volumes or not volumes[node]:
1587
        continue
1588

    
1589
      node_vols = volumes[node][:]
1590
      node_vols.sort(key=lambda vol: vol['dev'])
1591

    
1592
      for vol in node_vols:
1593
        node_output = []
1594
        for field in self.op.output_fields:
1595
          if field == "node":
1596
            val = node
1597
          elif field == "phys":
1598
            val = vol['dev']
1599
          elif field == "vg":
1600
            val = vol['vg']
1601
          elif field == "name":
1602
            val = vol['name']
1603
          elif field == "size":
1604
            val = int(float(vol['size']))
1605
          elif field == "instance":
1606
            for inst in ilist:
1607
              if node not in lv_by_node[inst]:
1608
                continue
1609
              if vol['name'] in lv_by_node[inst][node]:
1610
                val = inst.name
1611
                break
1612
            else:
1613
              val = '-'
1614
          else:
1615
            raise errors.ParameterError(field)
1616
          node_output.append(str(val))
1617

    
1618
        output.append(node_output)
1619

    
1620
    return output
1621

    
1622

    
1623
class LUAddNode(LogicalUnit):
1624
  """Logical unit for adding node to the cluster.
1625

1626
  """
1627
  HPATH = "node-add"
1628
  HTYPE = constants.HTYPE_NODE
1629
  _OP_REQP = ["node_name"]
1630

    
1631
  def BuildHooksEnv(self):
1632
    """Build hooks env.
1633

1634
    This will run on all nodes before, and on all nodes + the new node after.
1635

1636
    """
1637
    env = {
1638
      "OP_TARGET": self.op.node_name,
1639
      "NODE_NAME": self.op.node_name,
1640
      "NODE_PIP": self.op.primary_ip,
1641
      "NODE_SIP": self.op.secondary_ip,
1642
      }
1643
    nodes_0 = self.cfg.GetNodeList()
1644
    nodes_1 = nodes_0 + [self.op.node_name, ]
1645
    return env, nodes_0, nodes_1
1646

    
1647
  def CheckPrereq(self):
1648
    """Check prerequisites.
1649

1650
    This checks:
1651
     - the new node is not already in the config
1652
     - it is resolvable
1653
     - its parameters (single/dual homed) matches the cluster
1654

1655
    Any errors are signalled by raising errors.OpPrereqError.
1656

1657
    """
1658
    node_name = self.op.node_name
1659
    cfg = self.cfg
1660

    
1661
    dns_data = utils.HostInfo(node_name)
1662

    
1663
    node = dns_data.name
1664
    primary_ip = self.op.primary_ip = dns_data.ip
1665
    secondary_ip = getattr(self.op, "secondary_ip", None)
1666
    if secondary_ip is None:
1667
      secondary_ip = primary_ip
1668
    if not utils.IsValidIP(secondary_ip):
1669
      raise errors.OpPrereqError("Invalid secondary IP given")
1670
    self.op.secondary_ip = secondary_ip
1671

    
1672
    node_list = cfg.GetNodeList()
1673
    if not self.op.readd and node in node_list:
1674
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1675
                                 node)
1676
    elif self.op.readd and node not in node_list:
1677
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1678

    
1679
    for existing_node_name in node_list:
1680
      existing_node = cfg.GetNodeInfo(existing_node_name)
1681

    
1682
      if self.op.readd and node == existing_node_name:
1683
        if (existing_node.primary_ip != primary_ip or
1684
            existing_node.secondary_ip != secondary_ip):
1685
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1686
                                     " address configuration as before")
1687
        continue
1688

    
1689
      if (existing_node.primary_ip == primary_ip or
1690
          existing_node.secondary_ip == primary_ip or
1691
          existing_node.primary_ip == secondary_ip or
1692
          existing_node.secondary_ip == secondary_ip):
1693
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1694
                                   " existing node %s" % existing_node.name)
1695

    
1696
    # check that the type of the node (single versus dual homed) is the
1697
    # same as for the master
1698
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1699
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1700
    newbie_singlehomed = secondary_ip == primary_ip
1701
    if master_singlehomed != newbie_singlehomed:
1702
      if master_singlehomed:
1703
        raise errors.OpPrereqError("The master has no private ip but the"
1704
                                   " new node has one")
1705
      else:
1706
        raise errors.OpPrereqError("The master has a private ip but the"
1707
                                   " new node doesn't have one")
1708

    
1709
    # checks reachablity
1710
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1711
      raise errors.OpPrereqError("Node not reachable by ping")
1712

    
1713
    if not newbie_singlehomed:
1714
      # check reachability from my secondary ip to newbie's secondary ip
1715
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1716
                           source=myself.secondary_ip):
1717
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1718
                                   " based ping to noded port")
1719

    
1720
    self.new_node = objects.Node(name=node,
1721
                                 primary_ip=primary_ip,
1722
                                 secondary_ip=secondary_ip)
1723

    
1724
  def Exec(self, feedback_fn):
1725
    """Adds the new node to the cluster.
1726

1727
    """
1728
    new_node = self.new_node
1729
    node = new_node.name
1730

    
1731
    # check connectivity
1732
    result = rpc.call_version([node])[node]
1733
    if result:
1734
      if constants.PROTOCOL_VERSION == result:
1735
        logger.Info("communication to node %s fine, sw version %s match" %
1736
                    (node, result))
1737
      else:
1738
        raise errors.OpExecError("Version mismatch master version %s,"
1739
                                 " node version %s" %
1740
                                 (constants.PROTOCOL_VERSION, result))
1741
    else:
1742
      raise errors.OpExecError("Cannot get version from the new node")
1743

    
1744
    # setup ssh on node
1745
    logger.Info("copy ssh key to node %s" % node)
1746
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1747
    keyarray = []
1748
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1749
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1750
                priv_key, pub_key]
1751

    
1752
    for i in keyfiles:
1753
      f = open(i, 'r')
1754
      try:
1755
        keyarray.append(f.read())
1756
      finally:
1757
        f.close()
1758

    
1759
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1760
                               keyarray[3], keyarray[4], keyarray[5])
1761

    
1762
    if not result:
1763
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1764

    
1765
    # Add node to our /etc/hosts, and add key to known_hosts
1766
    utils.AddHostToEtcHosts(new_node.name)
1767

    
1768
    if new_node.secondary_ip != new_node.primary_ip:
1769
      if not rpc.call_node_tcp_ping(new_node.name,
1770
                                    constants.LOCALHOST_IP_ADDRESS,
1771
                                    new_node.secondary_ip,
1772
                                    constants.DEFAULT_NODED_PORT,
1773
                                    10, False):
1774
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1775
                                 " you gave (%s). Please fix and re-run this"
1776
                                 " command." % new_node.secondary_ip)
1777

    
1778
    node_verify_list = [self.sstore.GetMasterNode()]
1779
    node_verify_param = {
1780
      'nodelist': [node],
1781
      # TODO: do a node-net-test as well?
1782
    }
1783

    
1784
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1785
    for verifier in node_verify_list:
1786
      if not result[verifier]:
1787
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1788
                                 " for remote verification" % verifier)
1789
      if result[verifier]['nodelist']:
1790
        for failed in result[verifier]['nodelist']:
1791
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1792
                      (verifier, result[verifier]['nodelist'][failed]))
1793
        raise errors.OpExecError("ssh/hostname verification failed.")
1794

    
1795
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1796
    # including the node just added
1797
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1798
    dist_nodes = self.cfg.GetNodeList()
1799
    if not self.op.readd:
1800
      dist_nodes.append(node)
1801
    if myself.name in dist_nodes:
1802
      dist_nodes.remove(myself.name)
1803

    
1804
    logger.Debug("Copying hosts and known_hosts to all nodes")
1805
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1806
      result = rpc.call_upload_file(dist_nodes, fname)
1807
      for to_node in dist_nodes:
1808
        if not result[to_node]:
1809
          logger.Error("copy of file %s to node %s failed" %
1810
                       (fname, to_node))
1811

    
1812
    to_copy = self.sstore.GetFileList()
1813
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1814
      to_copy.append(constants.VNC_PASSWORD_FILE)
1815
    for fname in to_copy:
1816
      result = rpc.call_upload_file([node], fname)
1817
      if not result[node]:
1818
        logger.Error("could not copy file %s to node %s" % (fname, node))
1819

    
1820
    if self.op.readd:
1821
      self.context.ReaddNode(new_node)
1822
    else:
1823
      self.context.AddNode(new_node)
1824

    
1825

    
1826
class LUQueryClusterInfo(NoHooksLU):
1827
  """Query cluster configuration.
1828

1829
  """
1830
  _OP_REQP = []
1831
  REQ_MASTER = False
1832
  REQ_BGL = False
1833

    
1834
  def ExpandNames(self):
1835
    self.needed_locks = {}
1836

    
1837
  def CheckPrereq(self):
1838
    """No prerequsites needed for this LU.
1839

1840
    """
1841
    pass
1842

    
1843
  def Exec(self, feedback_fn):
1844
    """Return cluster config.
1845

1846
    """
1847
    result = {
1848
      "name": self.sstore.GetClusterName(),
1849
      "software_version": constants.RELEASE_VERSION,
1850
      "protocol_version": constants.PROTOCOL_VERSION,
1851
      "config_version": constants.CONFIG_VERSION,
1852
      "os_api_version": constants.OS_API_VERSION,
1853
      "export_version": constants.EXPORT_VERSION,
1854
      "master": self.sstore.GetMasterNode(),
1855
      "architecture": (platform.architecture()[0], platform.machine()),
1856
      "hypervisor_type": self.sstore.GetHypervisorType(),
1857
      }
1858

    
1859
    return result
1860

    
1861

    
1862
class LUDumpClusterConfig(NoHooksLU):
1863
  """Return a text-representation of the cluster-config.
1864

1865
  """
1866
  _OP_REQP = []
1867
  REQ_BGL = False
1868

    
1869
  def ExpandNames(self):
1870
    self.needed_locks = {}
1871

    
1872
  def CheckPrereq(self):
1873
    """No prerequisites.
1874

1875
    """
1876
    pass
1877

    
1878
  def Exec(self, feedback_fn):
1879
    """Dump a representation of the cluster config to the standard output.
1880

1881
    """
1882
    return self.cfg.DumpConfig()
1883

    
1884

    
1885
class LUActivateInstanceDisks(NoHooksLU):
1886
  """Bring up an instance's disks.
1887

1888
  """
1889
  _OP_REQP = ["instance_name"]
1890
  REQ_BGL = False
1891

    
1892
  def ExpandNames(self):
1893
    self._ExpandAndLockInstance()
1894
    self.needed_locks[locking.LEVEL_NODE] = []
1895
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1896

    
1897
  def DeclareLocks(self, level):
1898
    if level == locking.LEVEL_NODE:
1899
      self._LockInstancesNodes()
1900

    
1901
  def CheckPrereq(self):
1902
    """Check prerequisites.
1903

1904
    This checks that the instance is in the cluster.
1905

1906
    """
1907
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1908
    assert self.instance is not None, \
1909
      "Cannot retrieve locked instance %s" % self.op.instance_name
1910

    
1911
  def Exec(self, feedback_fn):
1912
    """Activate the disks.
1913

1914
    """
1915
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1916
    if not disks_ok:
1917
      raise errors.OpExecError("Cannot activate block devices")
1918

    
1919
    return disks_info
1920

    
1921

    
1922
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1923
  """Prepare the block devices for an instance.
1924

1925
  This sets up the block devices on all nodes.
1926

1927
  Args:
1928
    instance: a ganeti.objects.Instance object
1929
    ignore_secondaries: if true, errors on secondary nodes won't result
1930
                        in an error return from the function
1931

1932
  Returns:
1933
    false if the operation failed
1934
    list of (host, instance_visible_name, node_visible_name) if the operation
1935
         suceeded with the mapping from node devices to instance devices
1936
  """
1937
  device_info = []
1938
  disks_ok = True
1939
  iname = instance.name
1940
  # With the two passes mechanism we try to reduce the window of
1941
  # opportunity for the race condition of switching DRBD to primary
1942
  # before handshaking occured, but we do not eliminate it
1943

    
1944
  # The proper fix would be to wait (with some limits) until the
1945
  # connection has been made and drbd transitions from WFConnection
1946
  # into any other network-connected state (Connected, SyncTarget,
1947
  # SyncSource, etc.)
1948

    
1949
  # 1st pass, assemble on all nodes in secondary mode
1950
  for inst_disk in instance.disks:
1951
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1952
      cfg.SetDiskID(node_disk, node)
1953
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1954
      if not result:
1955
        logger.Error("could not prepare block device %s on node %s"
1956
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1957
        if not ignore_secondaries:
1958
          disks_ok = False
1959

    
1960
  # FIXME: race condition on drbd migration to primary
1961

    
1962
  # 2nd pass, do only the primary node
1963
  for inst_disk in instance.disks:
1964
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1965
      if node != instance.primary_node:
1966
        continue
1967
      cfg.SetDiskID(node_disk, node)
1968
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1969
      if not result:
1970
        logger.Error("could not prepare block device %s on node %s"
1971
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1972
        disks_ok = False
1973
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1974

    
1975
  # leave the disks configured for the primary node
1976
  # this is a workaround that would be fixed better by
1977
  # improving the logical/physical id handling
1978
  for disk in instance.disks:
1979
    cfg.SetDiskID(disk, instance.primary_node)
1980

    
1981
  return disks_ok, device_info
1982

    
1983

    
1984
def _StartInstanceDisks(cfg, instance, force):
1985
  """Start the disks of an instance.
1986

1987
  """
1988
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1989
                                           ignore_secondaries=force)
1990
  if not disks_ok:
1991
    _ShutdownInstanceDisks(instance, cfg)
1992
    if force is not None and not force:
1993
      logger.Error("If the message above refers to a secondary node,"
1994
                   " you can retry the operation using '--force'.")
1995
    raise errors.OpExecError("Disk consistency error")
1996

    
1997

    
1998
class LUDeactivateInstanceDisks(NoHooksLU):
1999
  """Shutdown an instance's disks.
2000

2001
  """
2002
  _OP_REQP = ["instance_name"]
2003
  REQ_BGL = False
2004

    
2005
  def ExpandNames(self):
2006
    self._ExpandAndLockInstance()
2007
    self.needed_locks[locking.LEVEL_NODE] = []
2008
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2009

    
2010
  def DeclareLocks(self, level):
2011
    if level == locking.LEVEL_NODE:
2012
      self._LockInstancesNodes()
2013

    
2014
  def CheckPrereq(self):
2015
    """Check prerequisites.
2016

2017
    This checks that the instance is in the cluster.
2018

2019
    """
2020
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2021
    assert self.instance is not None, \
2022
      "Cannot retrieve locked instance %s" % self.op.instance_name
2023

    
2024
  def Exec(self, feedback_fn):
2025
    """Deactivate the disks
2026

2027
    """
2028
    instance = self.instance
2029
    _SafeShutdownInstanceDisks(instance, self.cfg)
2030

    
2031

    
2032
def _SafeShutdownInstanceDisks(instance, cfg):
2033
  """Shutdown block devices of an instance.
2034

2035
  This function checks if an instance is running, before calling
2036
  _ShutdownInstanceDisks.
2037

2038
  """
2039
  ins_l = rpc.call_instance_list([instance.primary_node])
2040
  ins_l = ins_l[instance.primary_node]
2041
  if not type(ins_l) is list:
2042
    raise errors.OpExecError("Can't contact node '%s'" %
2043
                             instance.primary_node)
2044

    
2045
  if instance.name in ins_l:
2046
    raise errors.OpExecError("Instance is running, can't shutdown"
2047
                             " block devices.")
2048

    
2049
  _ShutdownInstanceDisks(instance, cfg)
2050

    
2051

    
2052
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2053
  """Shutdown block devices of an instance.
2054

2055
  This does the shutdown on all nodes of the instance.
2056

2057
  If the ignore_primary is false, errors on the primary node are
2058
  ignored.
2059

2060
  """
2061
  result = True
2062
  for disk in instance.disks:
2063
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2064
      cfg.SetDiskID(top_disk, node)
2065
      if not rpc.call_blockdev_shutdown(node, top_disk):
2066
        logger.Error("could not shutdown block device %s on node %s" %
2067
                     (disk.iv_name, node))
2068
        if not ignore_primary or node != instance.primary_node:
2069
          result = False
2070
  return result
2071

    
2072

    
2073
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2074
  """Checks if a node has enough free memory.
2075

2076
  This function check if a given node has the needed amount of free
2077
  memory. In case the node has less memory or we cannot get the
2078
  information from the node, this function raise an OpPrereqError
2079
  exception.
2080

2081
  Args:
2082
    - cfg: a ConfigWriter instance
2083
    - node: the node name
2084
    - reason: string to use in the error message
2085
    - requested: the amount of memory in MiB
2086

2087
  """
2088
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2089
  if not nodeinfo or not isinstance(nodeinfo, dict):
2090
    raise errors.OpPrereqError("Could not contact node %s for resource"
2091
                             " information" % (node,))
2092

    
2093
  free_mem = nodeinfo[node].get('memory_free')
2094
  if not isinstance(free_mem, int):
2095
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2096
                             " was '%s'" % (node, free_mem))
2097
  if requested > free_mem:
2098
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2099
                             " needed %s MiB, available %s MiB" %
2100
                             (node, reason, requested, free_mem))
2101

    
2102

    
2103
class LUStartupInstance(LogicalUnit):
2104
  """Starts an instance.
2105

2106
  """
2107
  HPATH = "instance-start"
2108
  HTYPE = constants.HTYPE_INSTANCE
2109
  _OP_REQP = ["instance_name", "force"]
2110
  REQ_BGL = False
2111

    
2112
  def ExpandNames(self):
2113
    self._ExpandAndLockInstance()
2114
    self.needed_locks[locking.LEVEL_NODE] = []
2115
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2116

    
2117
  def DeclareLocks(self, level):
2118
    if level == locking.LEVEL_NODE:
2119
      self._LockInstancesNodes()
2120

    
2121
  def BuildHooksEnv(self):
2122
    """Build hooks env.
2123

2124
    This runs on master, primary and secondary nodes of the instance.
2125

2126
    """
2127
    env = {
2128
      "FORCE": self.op.force,
2129
      }
2130
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2131
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2132
          list(self.instance.secondary_nodes))
2133
    return env, nl, nl
2134

    
2135
  def CheckPrereq(self):
2136
    """Check prerequisites.
2137

2138
    This checks that the instance is in the cluster.
2139

2140
    """
2141
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2142
    assert self.instance is not None, \
2143
      "Cannot retrieve locked instance %s" % self.op.instance_name
2144

    
2145
    # check bridges existance
2146
    _CheckInstanceBridgesExist(instance)
2147

    
2148
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2149
                         "starting instance %s" % instance.name,
2150
                         instance.memory)
2151

    
2152
  def Exec(self, feedback_fn):
2153
    """Start the instance.
2154

2155
    """
2156
    instance = self.instance
2157
    force = self.op.force
2158
    extra_args = getattr(self.op, "extra_args", "")
2159

    
2160
    self.cfg.MarkInstanceUp(instance.name)
2161

    
2162
    node_current = instance.primary_node
2163

    
2164
    _StartInstanceDisks(self.cfg, instance, force)
2165

    
2166
    if not rpc.call_instance_start(node_current, instance, extra_args):
2167
      _ShutdownInstanceDisks(instance, self.cfg)
2168
      raise errors.OpExecError("Could not start instance")
2169

    
2170

    
2171
class LURebootInstance(LogicalUnit):
2172
  """Reboot an instance.
2173

2174
  """
2175
  HPATH = "instance-reboot"
2176
  HTYPE = constants.HTYPE_INSTANCE
2177
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2178
  REQ_BGL = False
2179

    
2180
  def ExpandNames(self):
2181
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2182
                                   constants.INSTANCE_REBOOT_HARD,
2183
                                   constants.INSTANCE_REBOOT_FULL]:
2184
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2185
                                  (constants.INSTANCE_REBOOT_SOFT,
2186
                                   constants.INSTANCE_REBOOT_HARD,
2187
                                   constants.INSTANCE_REBOOT_FULL))
2188
    self._ExpandAndLockInstance()
2189
    self.needed_locks[locking.LEVEL_NODE] = []
2190
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2191

    
2192
  def DeclareLocks(self, level):
2193
    if level == locking.LEVEL_NODE:
2194
      primary_only = not constants.INSTANCE_REBOOT_FULL
2195
      self._LockInstancesNodes(primary_only=primary_only)
2196

    
2197
  def BuildHooksEnv(self):
2198
    """Build hooks env.
2199

2200
    This runs on master, primary and secondary nodes of the instance.
2201

2202
    """
2203
    env = {
2204
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2205
      }
2206
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2207
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2208
          list(self.instance.secondary_nodes))
2209
    return env, nl, nl
2210

    
2211
  def CheckPrereq(self):
2212
    """Check prerequisites.
2213

2214
    This checks that the instance is in the cluster.
2215

2216
    """
2217
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2218
    assert self.instance is not None, \
2219
      "Cannot retrieve locked instance %s" % self.op.instance_name
2220

    
2221
    # check bridges existance
2222
    _CheckInstanceBridgesExist(instance)
2223

    
2224
  def Exec(self, feedback_fn):
2225
    """Reboot the instance.
2226

2227
    """
2228
    instance = self.instance
2229
    ignore_secondaries = self.op.ignore_secondaries
2230
    reboot_type = self.op.reboot_type
2231
    extra_args = getattr(self.op, "extra_args", "")
2232

    
2233
    node_current = instance.primary_node
2234

    
2235
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2236
                       constants.INSTANCE_REBOOT_HARD]:
2237
      if not rpc.call_instance_reboot(node_current, instance,
2238
                                      reboot_type, extra_args):
2239
        raise errors.OpExecError("Could not reboot instance")
2240
    else:
2241
      if not rpc.call_instance_shutdown(node_current, instance):
2242
        raise errors.OpExecError("could not shutdown instance for full reboot")
2243
      _ShutdownInstanceDisks(instance, self.cfg)
2244
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2245
      if not rpc.call_instance_start(node_current, instance, extra_args):
2246
        _ShutdownInstanceDisks(instance, self.cfg)
2247
        raise errors.OpExecError("Could not start instance for full reboot")
2248

    
2249
    self.cfg.MarkInstanceUp(instance.name)
2250

    
2251

    
2252
class LUShutdownInstance(LogicalUnit):
2253
  """Shutdown an instance.
2254

2255
  """
2256
  HPATH = "instance-stop"
2257
  HTYPE = constants.HTYPE_INSTANCE
2258
  _OP_REQP = ["instance_name"]
2259
  REQ_BGL = False
2260

    
2261
  def ExpandNames(self):
2262
    self._ExpandAndLockInstance()
2263
    self.needed_locks[locking.LEVEL_NODE] = []
2264
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2265

    
2266
  def DeclareLocks(self, level):
2267
    if level == locking.LEVEL_NODE:
2268
      self._LockInstancesNodes()
2269

    
2270
  def BuildHooksEnv(self):
2271
    """Build hooks env.
2272

2273
    This runs on master, primary and secondary nodes of the instance.
2274

2275
    """
2276
    env = _BuildInstanceHookEnvByObject(self.instance)
2277
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2278
          list(self.instance.secondary_nodes))
2279
    return env, nl, nl
2280

    
2281
  def CheckPrereq(self):
2282
    """Check prerequisites.
2283

2284
    This checks that the instance is in the cluster.
2285

2286
    """
2287
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2288
    assert self.instance is not None, \
2289
      "Cannot retrieve locked instance %s" % self.op.instance_name
2290

    
2291
  def Exec(self, feedback_fn):
2292
    """Shutdown the instance.
2293

2294
    """
2295
    instance = self.instance
2296
    node_current = instance.primary_node
2297
    self.cfg.MarkInstanceDown(instance.name)
2298
    if not rpc.call_instance_shutdown(node_current, instance):
2299
      logger.Error("could not shutdown instance")
2300

    
2301
    _ShutdownInstanceDisks(instance, self.cfg)
2302

    
2303

    
2304
class LUReinstallInstance(LogicalUnit):
2305
  """Reinstall an instance.
2306

2307
  """
2308
  HPATH = "instance-reinstall"
2309
  HTYPE = constants.HTYPE_INSTANCE
2310
  _OP_REQP = ["instance_name"]
2311
  REQ_BGL = False
2312

    
2313
  def ExpandNames(self):
2314
    self._ExpandAndLockInstance()
2315
    self.needed_locks[locking.LEVEL_NODE] = []
2316
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2317

    
2318
  def DeclareLocks(self, level):
2319
    if level == locking.LEVEL_NODE:
2320
      self._LockInstancesNodes()
2321

    
2322
  def BuildHooksEnv(self):
2323
    """Build hooks env.
2324

2325
    This runs on master, primary and secondary nodes of the instance.
2326

2327
    """
2328
    env = _BuildInstanceHookEnvByObject(self.instance)
2329
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2330
          list(self.instance.secondary_nodes))
2331
    return env, nl, nl
2332

    
2333
  def CheckPrereq(self):
2334
    """Check prerequisites.
2335

2336
    This checks that the instance is in the cluster and is not running.
2337

2338
    """
2339
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2340
    assert instance is not None, \
2341
      "Cannot retrieve locked instance %s" % self.op.instance_name
2342

    
2343
    if instance.disk_template == constants.DT_DISKLESS:
2344
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2345
                                 self.op.instance_name)
2346
    if instance.status != "down":
2347
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2348
                                 self.op.instance_name)
2349
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2350
    if remote_info:
2351
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2352
                                 (self.op.instance_name,
2353
                                  instance.primary_node))
2354

    
2355
    self.op.os_type = getattr(self.op, "os_type", None)
2356
    if self.op.os_type is not None:
2357
      # OS verification
2358
      pnode = self.cfg.GetNodeInfo(
2359
        self.cfg.ExpandNodeName(instance.primary_node))
2360
      if pnode is None:
2361
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2362
                                   self.op.pnode)
2363
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2364
      if not os_obj:
2365
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2366
                                   " primary node"  % self.op.os_type)
2367

    
2368
    self.instance = instance
2369

    
2370
  def Exec(self, feedback_fn):
2371
    """Reinstall the instance.
2372

2373
    """
2374
    inst = self.instance
2375

    
2376
    if self.op.os_type is not None:
2377
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2378
      inst.os = self.op.os_type
2379
      self.cfg.Update(inst)
2380

    
2381
    _StartInstanceDisks(self.cfg, inst, None)
2382
    try:
2383
      feedback_fn("Running the instance OS create scripts...")
2384
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2385
        raise errors.OpExecError("Could not install OS for instance %s"
2386
                                 " on node %s" %
2387
                                 (inst.name, inst.primary_node))
2388
    finally:
2389
      _ShutdownInstanceDisks(inst, self.cfg)
2390

    
2391

    
2392
class LURenameInstance(LogicalUnit):
2393
  """Rename an instance.
2394

2395
  """
2396
  HPATH = "instance-rename"
2397
  HTYPE = constants.HTYPE_INSTANCE
2398
  _OP_REQP = ["instance_name", "new_name"]
2399

    
2400
  def BuildHooksEnv(self):
2401
    """Build hooks env.
2402

2403
    This runs on master, primary and secondary nodes of the instance.
2404

2405
    """
2406
    env = _BuildInstanceHookEnvByObject(self.instance)
2407
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2408
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2409
          list(self.instance.secondary_nodes))
2410
    return env, nl, nl
2411

    
2412
  def CheckPrereq(self):
2413
    """Check prerequisites.
2414

2415
    This checks that the instance is in the cluster and is not running.
2416

2417
    """
2418
    instance = self.cfg.GetInstanceInfo(
2419
      self.cfg.ExpandInstanceName(self.op.instance_name))
2420
    if instance is None:
2421
      raise errors.OpPrereqError("Instance '%s' not known" %
2422
                                 self.op.instance_name)
2423
    if instance.status != "down":
2424
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2425
                                 self.op.instance_name)
2426
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2427
    if remote_info:
2428
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2429
                                 (self.op.instance_name,
2430
                                  instance.primary_node))
2431
    self.instance = instance
2432

    
2433
    # new name verification
2434
    name_info = utils.HostInfo(self.op.new_name)
2435

    
2436
    self.op.new_name = new_name = name_info.name
2437
    instance_list = self.cfg.GetInstanceList()
2438
    if new_name in instance_list:
2439
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2440
                                 new_name)
2441

    
2442
    if not getattr(self.op, "ignore_ip", False):
2443
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2444
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2445
                                   (name_info.ip, new_name))
2446

    
2447

    
2448
  def Exec(self, feedback_fn):
2449
    """Reinstall the instance.
2450

2451
    """
2452
    inst = self.instance
2453
    old_name = inst.name
2454

    
2455
    if inst.disk_template == constants.DT_FILE:
2456
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2457

    
2458
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2459
    # Change the instance lock. This is definitely safe while we hold the BGL
2460
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2461
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2462

    
2463
    # re-read the instance from the configuration after rename
2464
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2465

    
2466
    if inst.disk_template == constants.DT_FILE:
2467
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2468
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2469
                                                old_file_storage_dir,
2470
                                                new_file_storage_dir)
2471

    
2472
      if not result:
2473
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2474
                                 " directory '%s' to '%s' (but the instance"
2475
                                 " has been renamed in Ganeti)" % (
2476
                                 inst.primary_node, old_file_storage_dir,
2477
                                 new_file_storage_dir))
2478

    
2479
      if not result[0]:
2480
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2481
                                 " (but the instance has been renamed in"
2482
                                 " Ganeti)" % (old_file_storage_dir,
2483
                                               new_file_storage_dir))
2484

    
2485
    _StartInstanceDisks(self.cfg, inst, None)
2486
    try:
2487
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2488
                                          "sda", "sdb"):
2489
        msg = ("Could not run OS rename script for instance %s on node %s"
2490
               " (but the instance has been renamed in Ganeti)" %
2491
               (inst.name, inst.primary_node))
2492
        logger.Error(msg)
2493
    finally:
2494
      _ShutdownInstanceDisks(inst, self.cfg)
2495

    
2496

    
2497
class LURemoveInstance(LogicalUnit):
2498
  """Remove an instance.
2499

2500
  """
2501
  HPATH = "instance-remove"
2502
  HTYPE = constants.HTYPE_INSTANCE
2503
  _OP_REQP = ["instance_name", "ignore_failures"]
2504
  REQ_BGL = False
2505

    
2506
  def ExpandNames(self):
2507
    self._ExpandAndLockInstance()
2508
    self.needed_locks[locking.LEVEL_NODE] = []
2509
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2510

    
2511
  def DeclareLocks(self, level):
2512
    if level == locking.LEVEL_NODE:
2513
      self._LockInstancesNodes()
2514

    
2515
  def BuildHooksEnv(self):
2516
    """Build hooks env.
2517

2518
    This runs on master, primary and secondary nodes of the instance.
2519

2520
    """
2521
    env = _BuildInstanceHookEnvByObject(self.instance)
2522
    nl = [self.sstore.GetMasterNode()]
2523
    return env, nl, nl
2524

    
2525
  def CheckPrereq(self):
2526
    """Check prerequisites.
2527

2528
    This checks that the instance is in the cluster.
2529

2530
    """
2531
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2532
    assert self.instance is not None, \
2533
      "Cannot retrieve locked instance %s" % self.op.instance_name
2534

    
2535
  def Exec(self, feedback_fn):
2536
    """Remove the instance.
2537

2538
    """
2539
    instance = self.instance
2540
    logger.Info("shutting down instance %s on node %s" %
2541
                (instance.name, instance.primary_node))
2542

    
2543
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2544
      if self.op.ignore_failures:
2545
        feedback_fn("Warning: can't shutdown instance")
2546
      else:
2547
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2548
                                 (instance.name, instance.primary_node))
2549

    
2550
    logger.Info("removing block devices for instance %s" % instance.name)
2551

    
2552
    if not _RemoveDisks(instance, self.cfg):
2553
      if self.op.ignore_failures:
2554
        feedback_fn("Warning: can't remove instance's disks")
2555
      else:
2556
        raise errors.OpExecError("Can't remove instance's disks")
2557

    
2558
    logger.Info("removing instance %s out of cluster config" % instance.name)
2559

    
2560
    self.cfg.RemoveInstance(instance.name)
2561
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2562

    
2563

    
2564
class LUQueryInstances(NoHooksLU):
2565
  """Logical unit for querying instances.
2566

2567
  """
2568
  _OP_REQP = ["output_fields", "names"]
2569
  REQ_BGL = False
2570

    
2571
  def ExpandNames(self):
2572
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2573
    self.static_fields = frozenset([
2574
      "name", "os", "pnode", "snodes",
2575
      "admin_state", "admin_ram",
2576
      "disk_template", "ip", "mac", "bridge",
2577
      "sda_size", "sdb_size", "vcpus", "tags",
2578
      "network_port", "kernel_path", "initrd_path",
2579
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2580
      "hvm_cdrom_image_path", "hvm_nic_type",
2581
      "hvm_disk_type", "vnc_bind_address",
2582
      "serial_no",
2583
      ])
2584
    _CheckOutputFields(static=self.static_fields,
2585
                       dynamic=self.dynamic_fields,
2586
                       selected=self.op.output_fields)
2587

    
2588
    self.needed_locks = {}
2589
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2590
    self.share_locks[locking.LEVEL_NODE] = 1
2591

    
2592
    if self.op.names:
2593
      self.wanted = _GetWantedInstances(self, self.op.names)
2594
    else:
2595
      self.wanted = locking.ALL_SET
2596

    
2597
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2598
    if self.do_locking:
2599
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2600
      self.needed_locks[locking.LEVEL_NODE] = []
2601
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2602

    
2603
  def DeclareLocks(self, level):
2604
    if level == locking.LEVEL_NODE and self.do_locking:
2605
      self._LockInstancesNodes()
2606

    
2607
  def CheckPrereq(self):
2608
    """Check prerequisites.
2609

2610
    """
2611
    pass
2612

    
2613
  def Exec(self, feedback_fn):
2614
    """Computes the list of nodes and their attributes.
2615

2616
    """
2617
    all_info = self.cfg.GetAllInstancesInfo()
2618
    if self.do_locking:
2619
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2620
    elif self.wanted != locking.ALL_SET:
2621
      instance_names = self.wanted
2622
      missing = set(instance_names).difference(all_info.keys())
2623
      if missing:
2624
        raise self.OpExecError(
2625
          "Some instances were removed before retrieving their data: %s"
2626
          % missing)
2627
    else:
2628
      instance_names = all_info.keys()
2629
    instance_list = [all_info[iname] for iname in instance_names]
2630

    
2631
    # begin data gathering
2632

    
2633
    nodes = frozenset([inst.primary_node for inst in instance_list])
2634

    
2635
    bad_nodes = []
2636
    if self.dynamic_fields.intersection(self.op.output_fields):
2637
      live_data = {}
2638
      node_data = rpc.call_all_instances_info(nodes)
2639
      for name in nodes:
2640
        result = node_data[name]
2641
        if result:
2642
          live_data.update(result)
2643
        elif result == False:
2644
          bad_nodes.append(name)
2645
        # else no instance is alive
2646
    else:
2647
      live_data = dict([(name, {}) for name in instance_names])
2648

    
2649
    # end data gathering
2650

    
2651
    output = []
2652
    for instance in instance_list:
2653
      iout = []
2654
      for field in self.op.output_fields:
2655
        if field == "name":
2656
          val = instance.name
2657
        elif field == "os":
2658
          val = instance.os
2659
        elif field == "pnode":
2660
          val = instance.primary_node
2661
        elif field == "snodes":
2662
          val = list(instance.secondary_nodes)
2663
        elif field == "admin_state":
2664
          val = (instance.status != "down")
2665
        elif field == "oper_state":
2666
          if instance.primary_node in bad_nodes:
2667
            val = None
2668
          else:
2669
            val = bool(live_data.get(instance.name))
2670
        elif field == "status":
2671
          if instance.primary_node in bad_nodes:
2672
            val = "ERROR_nodedown"
2673
          else:
2674
            running = bool(live_data.get(instance.name))
2675
            if running:
2676
              if instance.status != "down":
2677
                val = "running"
2678
              else:
2679
                val = "ERROR_up"
2680
            else:
2681
              if instance.status != "down":
2682
                val = "ERROR_down"
2683
              else:
2684
                val = "ADMIN_down"
2685
        elif field == "admin_ram":
2686
          val = instance.memory
2687
        elif field == "oper_ram":
2688
          if instance.primary_node in bad_nodes:
2689
            val = None
2690
          elif instance.name in live_data:
2691
            val = live_data[instance.name].get("memory", "?")
2692
          else:
2693
            val = "-"
2694
        elif field == "disk_template":
2695
          val = instance.disk_template
2696
        elif field == "ip":
2697
          val = instance.nics[0].ip
2698
        elif field == "bridge":
2699
          val = instance.nics[0].bridge
2700
        elif field == "mac":
2701
          val = instance.nics[0].mac
2702
        elif field == "sda_size" or field == "sdb_size":
2703
          disk = instance.FindDisk(field[:3])
2704
          if disk is None:
2705
            val = None
2706
          else:
2707
            val = disk.size
2708
        elif field == "vcpus":
2709
          val = instance.vcpus
2710
        elif field == "tags":
2711
          val = list(instance.GetTags())
2712
        elif field == "serial_no":
2713
          val = instance.serial_no
2714
        elif field in ("network_port", "kernel_path", "initrd_path",
2715
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2716
                       "hvm_cdrom_image_path", "hvm_nic_type",
2717
                       "hvm_disk_type", "vnc_bind_address"):
2718
          val = getattr(instance, field, None)
2719
          if val is not None:
2720
            pass
2721
          elif field in ("hvm_nic_type", "hvm_disk_type",
2722
                         "kernel_path", "initrd_path"):
2723
            val = "default"
2724
          else:
2725
            val = "-"
2726
        else:
2727
          raise errors.ParameterError(field)
2728
        iout.append(val)
2729
      output.append(iout)
2730

    
2731
    return output
2732

    
2733

    
2734
class LUFailoverInstance(LogicalUnit):
2735
  """Failover an instance.
2736

2737
  """
2738
  HPATH = "instance-failover"
2739
  HTYPE = constants.HTYPE_INSTANCE
2740
  _OP_REQP = ["instance_name", "ignore_consistency"]
2741
  REQ_BGL = False
2742

    
2743
  def ExpandNames(self):
2744
    self._ExpandAndLockInstance()
2745
    self.needed_locks[locking.LEVEL_NODE] = []
2746
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747

    
2748
  def DeclareLocks(self, level):
2749
    if level == locking.LEVEL_NODE:
2750
      self._LockInstancesNodes()
2751

    
2752
  def BuildHooksEnv(self):
2753
    """Build hooks env.
2754

2755
    This runs on master, primary and secondary nodes of the instance.
2756

2757
    """
2758
    env = {
2759
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2760
      }
2761
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2762
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2763
    return env, nl, nl
2764

    
2765
  def CheckPrereq(self):
2766
    """Check prerequisites.
2767

2768
    This checks that the instance is in the cluster.
2769

2770
    """
2771
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2772
    assert self.instance is not None, \
2773
      "Cannot retrieve locked instance %s" % self.op.instance_name
2774

    
2775
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2776
      raise errors.OpPrereqError("Instance's disk layout is not"
2777
                                 " network mirrored, cannot failover.")
2778

    
2779
    secondary_nodes = instance.secondary_nodes
2780
    if not secondary_nodes:
2781
      raise errors.ProgrammerError("no secondary node but using "
2782
                                   "a mirrored disk template")
2783

    
2784
    target_node = secondary_nodes[0]
2785
    # check memory requirements on the secondary node
2786
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2787
                         instance.name, instance.memory)
2788

    
2789
    # check bridge existance
2790
    brlist = [nic.bridge for nic in instance.nics]
2791
    if not rpc.call_bridges_exist(target_node, brlist):
2792
      raise errors.OpPrereqError("One or more target bridges %s does not"
2793
                                 " exist on destination node '%s'" %
2794
                                 (brlist, target_node))
2795

    
2796
  def Exec(self, feedback_fn):
2797
    """Failover an instance.
2798

2799
    The failover is done by shutting it down on its present node and
2800
    starting it on the secondary.
2801

2802
    """
2803
    instance = self.instance
2804

    
2805
    source_node = instance.primary_node
2806
    target_node = instance.secondary_nodes[0]
2807

    
2808
    feedback_fn("* checking disk consistency between source and target")
2809
    for dev in instance.disks:
2810
      # for drbd, these are drbd over lvm
2811
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2812
        if instance.status == "up" and not self.op.ignore_consistency:
2813
          raise errors.OpExecError("Disk %s is degraded on target node,"
2814
                                   " aborting failover." % dev.iv_name)
2815

    
2816
    feedback_fn("* shutting down instance on source node")
2817
    logger.Info("Shutting down instance %s on node %s" %
2818
                (instance.name, source_node))
2819

    
2820
    if not rpc.call_instance_shutdown(source_node, instance):
2821
      if self.op.ignore_consistency:
2822
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2823
                     " anyway. Please make sure node %s is down"  %
2824
                     (instance.name, source_node, source_node))
2825
      else:
2826
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2827
                                 (instance.name, source_node))
2828

    
2829
    feedback_fn("* deactivating the instance's disks on source node")
2830
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2831
      raise errors.OpExecError("Can't shut down the instance's disks.")
2832

    
2833
    instance.primary_node = target_node
2834
    # distribute new instance config to the other nodes
2835
    self.cfg.Update(instance)
2836

    
2837
    # Only start the instance if it's marked as up
2838
    if instance.status == "up":
2839
      feedback_fn("* activating the instance's disks on target node")
2840
      logger.Info("Starting instance %s on node %s" %
2841
                  (instance.name, target_node))
2842

    
2843
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2844
                                               ignore_secondaries=True)
2845
      if not disks_ok:
2846
        _ShutdownInstanceDisks(instance, self.cfg)
2847
        raise errors.OpExecError("Can't activate the instance's disks")
2848

    
2849
      feedback_fn("* starting the instance on the target node")
2850
      if not rpc.call_instance_start(target_node, instance, None):
2851
        _ShutdownInstanceDisks(instance, self.cfg)
2852
        raise errors.OpExecError("Could not start instance %s on node %s." %
2853
                                 (instance.name, target_node))
2854

    
2855

    
2856
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2857
  """Create a tree of block devices on the primary node.
2858

2859
  This always creates all devices.
2860

2861
  """
2862
  if device.children:
2863
    for child in device.children:
2864
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2865
        return False
2866

    
2867
  cfg.SetDiskID(device, node)
2868
  new_id = rpc.call_blockdev_create(node, device, device.size,
2869
                                    instance.name, True, info)
2870
  if not new_id:
2871
    return False
2872
  if device.physical_id is None:
2873
    device.physical_id = new_id
2874
  return True
2875

    
2876

    
2877
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2878
  """Create a tree of block devices on a secondary node.
2879

2880
  If this device type has to be created on secondaries, create it and
2881
  all its children.
2882

2883
  If not, just recurse to children keeping the same 'force' value.
2884

2885
  """
2886
  if device.CreateOnSecondary():
2887
    force = True
2888
  if device.children:
2889
    for child in device.children:
2890
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2891
                                        child, force, info):
2892
        return False
2893

    
2894
  if not force:
2895
    return True
2896
  cfg.SetDiskID(device, node)
2897
  new_id = rpc.call_blockdev_create(node, device, device.size,
2898
                                    instance.name, False, info)
2899
  if not new_id:
2900
    return False
2901
  if device.physical_id is None:
2902
    device.physical_id = new_id
2903
  return True
2904

    
2905

    
2906
def _GenerateUniqueNames(cfg, exts):
2907
  """Generate a suitable LV name.
2908

2909
  This will generate a logical volume name for the given instance.
2910

2911
  """
2912
  results = []
2913
  for val in exts:
2914
    new_id = cfg.GenerateUniqueID()
2915
    results.append("%s%s" % (new_id, val))
2916
  return results
2917

    
2918

    
2919
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2920
                         p_minor, s_minor):
2921
  """Generate a drbd8 device complete with its children.
2922

2923
  """
2924
  port = cfg.AllocatePort()
2925
  vgname = cfg.GetVGName()
2926
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2927
                          logical_id=(vgname, names[0]))
2928
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2929
                          logical_id=(vgname, names[1]))
2930
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2931
                          logical_id=(primary, secondary, port,
2932
                                      p_minor, s_minor),
2933
                          children=[dev_data, dev_meta],
2934
                          iv_name=iv_name)
2935
  return drbd_dev
2936

    
2937

    
2938
def _GenerateDiskTemplate(cfg, template_name,
2939
                          instance_name, primary_node,
2940
                          secondary_nodes, disk_sz, swap_sz,
2941
                          file_storage_dir, file_driver):
2942
  """Generate the entire disk layout for a given template type.
2943

2944
  """
2945
  #TODO: compute space requirements
2946

    
2947
  vgname = cfg.GetVGName()
2948
  if template_name == constants.DT_DISKLESS:
2949
    disks = []
2950
  elif template_name == constants.DT_PLAIN:
2951
    if len(secondary_nodes) != 0:
2952
      raise errors.ProgrammerError("Wrong template configuration")
2953

    
2954
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2955
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2956
                           logical_id=(vgname, names[0]),
2957
                           iv_name = "sda")
2958
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2959
                           logical_id=(vgname, names[1]),
2960
                           iv_name = "sdb")
2961
    disks = [sda_dev, sdb_dev]
2962
  elif template_name == constants.DT_DRBD8:
2963
    if len(secondary_nodes) != 1:
2964
      raise errors.ProgrammerError("Wrong template configuration")
2965
    remote_node = secondary_nodes[0]
2966
    (minor_pa, minor_pb,
2967
     minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2968
      [primary_node, primary_node, remote_node, remote_node], instance_name)
2969

    
2970
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2971
                                       ".sdb_data", ".sdb_meta"])
2972
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2973
                                        disk_sz, names[0:2], "sda",
2974
                                        minor_pa, minor_sa)
2975
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2976
                                        swap_sz, names[2:4], "sdb",
2977
                                        minor_pb, minor_sb)
2978
    disks = [drbd_sda_dev, drbd_sdb_dev]
2979
  elif template_name == constants.DT_FILE:
2980
    if len(secondary_nodes) != 0:
2981
      raise errors.ProgrammerError("Wrong template configuration")
2982

    
2983
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2984
                                iv_name="sda", logical_id=(file_driver,
2985
                                "%s/sda" % file_storage_dir))
2986
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2987
                                iv_name="sdb", logical_id=(file_driver,
2988
                                "%s/sdb" % file_storage_dir))
2989
    disks = [file_sda_dev, file_sdb_dev]
2990
  else:
2991
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2992
  return disks
2993

    
2994

    
2995
def _GetInstanceInfoText(instance):
2996
  """Compute that text that should be added to the disk's metadata.
2997

2998
  """
2999
  return "originstname+%s" % instance.name
3000

    
3001

    
3002
def _CreateDisks(cfg, instance):
3003
  """Create all disks for an instance.
3004

3005
  This abstracts away some work from AddInstance.
3006

3007
  Args:
3008
    instance: the instance object
3009

3010
  Returns:
3011
    True or False showing the success of the creation process
3012

3013
  """
3014
  info = _GetInstanceInfoText(instance)
3015

    
3016
  if instance.disk_template == constants.DT_FILE:
3017
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3018
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3019
                                              file_storage_dir)
3020

    
3021
    if not result:
3022
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3023
      return False
3024

    
3025
    if not result[0]:
3026
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3027
      return False
3028

    
3029
  for device in instance.disks:
3030
    logger.Info("creating volume %s for instance %s" %
3031
                (device.iv_name, instance.name))
3032
    #HARDCODE
3033
    for secondary_node in instance.secondary_nodes:
3034
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3035
                                        device, False, info):
3036
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3037
                     (device.iv_name, device, secondary_node))
3038
        return False
3039
    #HARDCODE
3040
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3041
                                    instance, device, info):
3042
      logger.Error("failed to create volume %s on primary!" %
3043
                   device.iv_name)
3044
      return False
3045

    
3046
  return True
3047

    
3048

    
3049
def _RemoveDisks(instance, cfg):
3050
  """Remove all disks for an instance.
3051

3052
  This abstracts away some work from `AddInstance()` and
3053
  `RemoveInstance()`. Note that in case some of the devices couldn't
3054
  be removed, the removal will continue with the other ones (compare
3055
  with `_CreateDisks()`).
3056

3057
  Args:
3058
    instance: the instance object
3059

3060
  Returns:
3061
    True or False showing the success of the removal proces
3062

3063
  """
3064
  logger.Info("removing block devices for instance %s" % instance.name)
3065

    
3066
  result = True
3067
  for device in instance.disks:
3068
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3069
      cfg.SetDiskID(disk, node)
3070
      if not rpc.call_blockdev_remove(node, disk):
3071
        logger.Error("could not remove block device %s on node %s,"
3072
                     " continuing anyway" %
3073
                     (device.iv_name, node))
3074
        result = False
3075

    
3076
  if instance.disk_template == constants.DT_FILE:
3077
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3078
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3079
                                            file_storage_dir):
3080
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3081
      result = False
3082

    
3083
  return result
3084

    
3085

    
3086
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3087
  """Compute disk size requirements in the volume group
3088

3089
  This is currently hard-coded for the two-drive layout.
3090

3091
  """
3092
  # Required free disk space as a function of disk and swap space
3093
  req_size_dict = {
3094
    constants.DT_DISKLESS: None,
3095
    constants.DT_PLAIN: disk_size + swap_size,
3096
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3097
    constants.DT_DRBD8: disk_size + swap_size + 256,
3098
    constants.DT_FILE: None,
3099
  }
3100

    
3101
  if disk_template not in req_size_dict:
3102
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3103
                                 " is unknown" %  disk_template)
3104

    
3105
  return req_size_dict[disk_template]
3106

    
3107

    
3108
class LUCreateInstance(LogicalUnit):
3109
  """Create an instance.
3110

3111
  """
3112
  HPATH = "instance-add"
3113
  HTYPE = constants.HTYPE_INSTANCE
3114
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3115
              "disk_template", "swap_size", "mode", "start", "vcpus",
3116
              "wait_for_sync", "ip_check", "mac"]
3117
  REQ_BGL = False
3118

    
3119
  def _ExpandNode(self, node):
3120
    """Expands and checks one node name.
3121

3122
    """
3123
    node_full = self.cfg.ExpandNodeName(node)
3124
    if node_full is None:
3125
      raise errors.OpPrereqError("Unknown node %s" % node)
3126
    return node_full
3127

    
3128
  def ExpandNames(self):
3129
    """ExpandNames for CreateInstance.
3130

3131
    Figure out the right locks for instance creation.
3132

3133
    """
3134
    self.needed_locks = {}
3135

    
3136
    # set optional parameters to none if they don't exist
3137
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3138
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3139
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3140
                 "vnc_bind_address"]:
3141
      if not hasattr(self.op, attr):
3142
        setattr(self.op, attr, None)
3143

    
3144
    # verify creation mode
3145
    if self.op.mode not in (constants.INSTANCE_CREATE,
3146
                            constants.INSTANCE_IMPORT):
3147
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3148
                                 self.op.mode)
3149
    # disk template and mirror node verification
3150
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3151
      raise errors.OpPrereqError("Invalid disk template name")
3152

    
3153
    #### instance parameters check
3154

    
3155
    # instance name verification
3156
    hostname1 = utils.HostInfo(self.op.instance_name)
3157
    self.op.instance_name = instance_name = hostname1.name
3158

    
3159
    # this is just a preventive check, but someone might still add this
3160
    # instance in the meantime, and creation will fail at lock-add time
3161
    if instance_name in self.cfg.GetInstanceList():
3162
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3163
                                 instance_name)
3164

    
3165
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3166

    
3167
    # ip validity checks
3168
    ip = getattr(self.op, "ip", None)
3169
    if ip is None or ip.lower() == "none":
3170
      inst_ip = None
3171
    elif ip.lower() == "auto":
3172
      inst_ip = hostname1.ip
3173
    else:
3174
      if not utils.IsValidIP(ip):
3175
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3176
                                   " like a valid IP" % ip)
3177
      inst_ip = ip
3178
    self.inst_ip = self.op.ip = inst_ip
3179
    # used in CheckPrereq for ip ping check
3180
    self.check_ip = hostname1.ip
3181

    
3182
    # MAC address verification
3183
    if self.op.mac != "auto":
3184
      if not utils.IsValidMac(self.op.mac.lower()):
3185
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3186
                                   self.op.mac)
3187

    
3188
    # boot order verification
3189
    if self.op.hvm_boot_order is not None:
3190
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3191
        raise errors.OpPrereqError("invalid boot order specified,"
3192
                                   " must be one or more of [acdn]")
3193
    # file storage checks
3194
    if (self.op.file_driver and
3195
        not self.op.file_driver in constants.FILE_DRIVER):
3196
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3197
                                 self.op.file_driver)
3198

    
3199
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3200
      raise errors.OpPrereqError("File storage directory path not absolute")
3201

    
3202
    ### Node/iallocator related checks
3203
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3204
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3205
                                 " node must be given")
3206

    
3207
    if self.op.iallocator:
3208
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3209
    else:
3210
      self.op.pnode = self._ExpandNode(self.op.pnode)
3211
      nodelist = [self.op.pnode]
3212
      if self.op.snode is not None:
3213
        self.op.snode = self._ExpandNode(self.op.snode)
3214
        nodelist.append(self.op.snode)
3215
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3216

    
3217
    # in case of import lock the source node too
3218
    if self.op.mode == constants.INSTANCE_IMPORT:
3219
      src_node = getattr(self.op, "src_node", None)
3220
      src_path = getattr(self.op, "src_path", None)
3221

    
3222
      if src_node is None or src_path is None:
3223
        raise errors.OpPrereqError("Importing an instance requires source"
3224
                                   " node and path options")
3225

    
3226
      if not os.path.isabs(src_path):
3227
        raise errors.OpPrereqError("The source path must be absolute")
3228

    
3229
      self.op.src_node = src_node = self._ExpandNode(src_node)
3230
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3231
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3232

    
3233
    else: # INSTANCE_CREATE
3234
      if getattr(self.op, "os_type", None) is None:
3235
        raise errors.OpPrereqError("No guest OS specified")
3236

    
3237
  def _RunAllocator(self):
3238
    """Run the allocator based on input opcode.
3239

3240
    """
3241
    disks = [{"size": self.op.disk_size, "mode": "w"},
3242
             {"size": self.op.swap_size, "mode": "w"}]
3243
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3244
             "bridge": self.op.bridge}]
3245
    ial = IAllocator(self.cfg, self.sstore,
3246
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3247
                     name=self.op.instance_name,
3248
                     disk_template=self.op.disk_template,
3249
                     tags=[],
3250
                     os=self.op.os_type,
3251
                     vcpus=self.op.vcpus,
3252
                     mem_size=self.op.mem_size,
3253
                     disks=disks,
3254
                     nics=nics,
3255
                     )
3256

    
3257
    ial.Run(self.op.iallocator)
3258

    
3259
    if not ial.success:
3260
      raise errors.OpPrereqError("Can't compute nodes using"
3261
                                 " iallocator '%s': %s" % (self.op.iallocator,
3262
                                                           ial.info))
3263
    if len(ial.nodes) != ial.required_nodes:
3264
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3265
                                 " of nodes (%s), required %s" %
3266
                                 (self.op.iallocator, len(ial.nodes),
3267
                                  ial.required_nodes))
3268
    self.op.pnode = ial.nodes[0]
3269
    logger.ToStdout("Selected nodes for the instance: %s" %
3270
                    (", ".join(ial.nodes),))
3271
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3272
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3273
    if ial.required_nodes == 2:
3274
      self.op.snode = ial.nodes[1]
3275

    
3276
  def BuildHooksEnv(self):
3277
    """Build hooks env.
3278

3279
    This runs on master, primary and secondary nodes of the instance.
3280

3281
    """
3282
    env = {
3283
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3284
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3285
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3286
      "INSTANCE_ADD_MODE": self.op.mode,
3287
      }
3288
    if self.op.mode == constants.INSTANCE_IMPORT:
3289
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3290
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3291
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3292

    
3293
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3294
      primary_node=self.op.pnode,
3295
      secondary_nodes=self.secondaries,
3296
      status=self.instance_status,
3297
      os_type=self.op.os_type,
3298
      memory=self.op.mem_size,
3299
      vcpus=self.op.vcpus,
3300
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3301
    ))
3302

    
3303
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3304
          self.secondaries)
3305
    return env, nl, nl
3306

    
3307

    
3308
  def CheckPrereq(self):
3309
    """Check prerequisites.
3310

3311
    """
3312
    if (not self.cfg.GetVGName() and
3313
        self.op.disk_template not in constants.DTS_NOT_LVM):
3314
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3315
                                 " instances")
3316

    
3317
    if self.op.mode == constants.INSTANCE_IMPORT:
3318
      src_node = self.op.src_node
3319
      src_path = self.op.src_path
3320

    
3321
      export_info = rpc.call_export_info(src_node, src_path)
3322

    
3323
      if not export_info:
3324
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3325

    
3326
      if not export_info.has_section(constants.INISECT_EXP):
3327
        raise errors.ProgrammerError("Corrupted export config")
3328

    
3329
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3330
      if (int(ei_version) != constants.EXPORT_VERSION):
3331
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3332
                                   (ei_version, constants.EXPORT_VERSION))
3333

    
3334
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3335
        raise errors.OpPrereqError("Can't import instance with more than"
3336
                                   " one data disk")
3337

    
3338
      # FIXME: are the old os-es, disk sizes, etc. useful?
3339
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3340
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3341
                                                         'disk0_dump'))
3342
      self.src_image = diskimage
3343

    
3344
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3345

    
3346
    if self.op.start and not self.op.ip_check:
3347
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3348
                                 " adding an instance in start mode")
3349

    
3350
    if self.op.ip_check:
3351
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3352
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3353
                                   (self.check_ip, instance_name))
3354

    
3355
    # bridge verification
3356
    bridge = getattr(self.op, "bridge", None)
3357
    if bridge is None:
3358
      self.op.bridge = self.cfg.GetDefBridge()
3359
    else:
3360
      self.op.bridge = bridge
3361

    
3362
    #### allocator run
3363

    
3364
    if self.op.iallocator is not None:
3365
      self._RunAllocator()
3366

    
3367
    #### node related checks
3368

    
3369
    # check primary node
3370
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3371
    assert self.pnode is not None, \
3372
      "Cannot retrieve locked node %s" % self.op.pnode
3373
    self.secondaries = []
3374

    
3375
    # mirror node verification
3376
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3377
      if self.op.snode is None:
3378
        raise errors.OpPrereqError("The networked disk templates need"
3379
                                   " a mirror node")
3380
      if self.op.snode == pnode.name:
3381
        raise errors.OpPrereqError("The secondary node cannot be"
3382
                                   " the primary node.")
3383
      self.secondaries.append(self.op.snode)
3384

    
3385
    req_size = _ComputeDiskSize(self.op.disk_template,
3386
                                self.op.disk_size, self.op.swap_size)
3387

    
3388
    # Check lv size requirements
3389
    if req_size is not None:
3390
      nodenames = [pnode.name] + self.secondaries
3391
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3392
      for node in nodenames:
3393
        info = nodeinfo.get(node, None)
3394
        if not info:
3395
          raise errors.OpPrereqError("Cannot get current information"
3396
                                     " from node '%s'" % node)
3397
        vg_free = info.get('vg_free', None)
3398
        if not isinstance(vg_free, int):
3399
          raise errors.OpPrereqError("Can't compute free disk space on"
3400
                                     " node %s" % node)
3401
        if req_size > info['vg_free']:
3402
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3403
                                     " %d MB available, %d MB required" %
3404
                                     (node, info['vg_free'], req_size))
3405

    
3406
    # os verification
3407
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3408
    if not os_obj:
3409
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3410
                                 " primary node"  % self.op.os_type)
3411

    
3412
    if self.op.kernel_path == constants.VALUE_NONE:
3413
      raise errors.OpPrereqError("Can't set instance kernel to none")
3414

    
3415
    # bridge check on primary node
3416
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3417
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3418
                                 " destination node '%s'" %
3419
                                 (self.op.bridge, pnode.name))
3420

    
3421
    # memory check on primary node
3422
    if self.op.start:
3423
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3424
                           "creating instance %s" % self.op.instance_name,
3425
                           self.op.mem_size)
3426

    
3427
    # hvm_cdrom_image_path verification
3428
    if self.op.hvm_cdrom_image_path is not None:
3429
      # FIXME (als): shouldn't these checks happen on the destination node?
3430
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3431
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3432
                                   " be an absolute path or None, not %s" %
3433
                                   self.op.hvm_cdrom_image_path)
3434
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3435
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3436
                                   " regular file or a symlink pointing to"
3437
                                   " an existing regular file, not %s" %
3438
                                   self.op.hvm_cdrom_image_path)
3439

    
3440
    # vnc_bind_address verification
3441
    if self.op.vnc_bind_address is not None:
3442
      if not utils.IsValidIP(self.op.vnc_bind_address):
3443
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3444
                                   " like a valid IP address" %
3445
                                   self.op.vnc_bind_address)
3446

    
3447
    # Xen HVM device type checks
3448
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3449
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3450
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3451
                                   " hypervisor" % self.op.hvm_nic_type)
3452
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3453
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3454
                                   " hypervisor" % self.op.hvm_disk_type)
3455

    
3456
    if self.op.start:
3457
      self.instance_status = 'up'
3458
    else:
3459
      self.instance_status = 'down'
3460

    
3461
  def Exec(self, feedback_fn):
3462
    """Create and add the instance to the cluster.
3463

3464
    """
3465
    instance = self.op.instance_name
3466
    pnode_name = self.pnode.name
3467

    
3468
    if self.op.mac == "auto":
3469
      mac_address = self.cfg.GenerateMAC()
3470
    else:
3471
      mac_address = self.op.mac
3472

    
3473
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3474
    if self.inst_ip is not None:
3475
      nic.ip = self.inst_ip
3476

    
3477
    ht_kind = self.sstore.GetHypervisorType()
3478
    if ht_kind in constants.HTS_REQ_PORT:
3479
      network_port = self.cfg.AllocatePort()
3480
    else:
3481
      network_port = None
3482

    
3483
    if self.op.vnc_bind_address is None:
3484
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3485

    
3486
    # this is needed because os.path.join does not accept None arguments
3487
    if self.op.file_storage_dir is None:
3488
      string_file_storage_dir = ""
3489
    else:
3490
      string_file_storage_dir = self.op.file_storage_dir
3491

    
3492
    # build the full file storage dir path
3493
    file_storage_dir = os.path.normpath(os.path.join(
3494
                                        self.sstore.GetFileStorageDir(),
3495
                                        string_file_storage_dir, instance))
3496

    
3497

    
3498
    disks = _GenerateDiskTemplate(self.cfg,
3499
                                  self.op.disk_template,
3500
                                  instance, pnode_name,
3501
                                  self.secondaries, self.op.disk_size,
3502
                                  self.op.swap_size,
3503
                                  file_storage_dir,
3504
                                  self.op.file_driver)
3505

    
3506
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3507
                            primary_node=pnode_name,
3508
                            memory=self.op.mem_size,
3509
                            vcpus=self.op.vcpus,
3510
                            nics=[nic], disks=disks,
3511
                            disk_template=self.op.disk_template,
3512
                            status=self.instance_status,
3513
                            network_port=network_port,
3514
                            kernel_path=self.op.kernel_path,
3515
                            initrd_path=self.op.initrd_path,
3516
                            hvm_boot_order=self.op.hvm_boot_order,
3517
                            hvm_acpi=self.op.hvm_acpi,
3518
                            hvm_pae=self.op.hvm_pae,
3519
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3520
                            vnc_bind_address=self.op.vnc_bind_address,
3521
                            hvm_nic_type=self.op.hvm_nic_type,
3522
                            hvm_disk_type=self.op.hvm_disk_type,
3523
                            )
3524

    
3525
    feedback_fn("* creating instance disks...")
3526
    if not _CreateDisks(self.cfg, iobj):
3527
      _RemoveDisks(iobj, self.cfg)
3528
      self.cfg.ReleaseDRBDMinors(instance)
3529
      raise errors.OpExecError("Device creation failed, reverting...")
3530

    
3531
    feedback_fn("adding instance %s to cluster config" % instance)
3532

    
3533
    self.cfg.AddInstance(iobj)
3534
    # Declare that we don't want to remove the instance lock anymore, as we've
3535
    # added the instance to the config
3536
    del self.remove_locks[locking.LEVEL_INSTANCE]
3537
    # Remove the temp. assignements for the instance's drbds
3538
    self.cfg.ReleaseDRBDMinors(instance)
3539

    
3540
    if self.op.wait_for_sync:
3541
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3542
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3543
      # make sure the disks are not degraded (still sync-ing is ok)
3544
      time.sleep(15)
3545
      feedback_fn("* checking mirrors status")
3546
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3547
    else:
3548
      disk_abort = False
3549

    
3550
    if disk_abort:
3551
      _RemoveDisks(iobj, self.cfg)
3552
      self.cfg.RemoveInstance(iobj.name)
3553
      # Make sure the instance lock gets removed
3554
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3555
      raise errors.OpExecError("There are some degraded disks for"
3556
                               " this instance")
3557

    
3558
    feedback_fn("creating os for instance %s on node %s" %
3559
                (instance, pnode_name))
3560

    
3561
    if iobj.disk_template != constants.DT_DISKLESS:
3562
      if self.op.mode == constants.INSTANCE_CREATE:
3563
        feedback_fn("* running the instance OS create scripts...")
3564
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3565
          raise errors.OpExecError("could not add os for instance %s"
3566
                                   " on node %s" %
3567
                                   (instance, pnode_name))
3568

    
3569
      elif self.op.mode == constants.INSTANCE_IMPORT:
3570
        feedback_fn("* running the instance OS import scripts...")
3571
        src_node = self.op.src_node
3572
        src_image = self.src_image
3573
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3574
                                                src_node, src_image):
3575
          raise errors.OpExecError("Could not import os for instance"
3576
                                   " %s on node %s" %
3577
                                   (instance, pnode_name))
3578
      else:
3579
        # also checked in the prereq part
3580
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3581
                                     % self.op.mode)
3582

    
3583
    if self.op.start:
3584
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3585
      feedback_fn("* starting instance...")
3586
      if not rpc.call_instance_start(pnode_name, iobj, None):
3587
        raise errors.OpExecError("Could not start instance")
3588

    
3589

    
3590
class LUConnectConsole(NoHooksLU):
3591
  """Connect to an instance's console.
3592

3593
  This is somewhat special in that it returns the command line that
3594
  you need to run on the master node in order to connect to the
3595
  console.
3596

3597
  """
3598
  _OP_REQP = ["instance_name"]
3599
  REQ_BGL = False
3600

    
3601
  def ExpandNames(self):
3602
    self._ExpandAndLockInstance()
3603

    
3604
  def CheckPrereq(self):
3605
    """Check prerequisites.
3606

3607
    This checks that the instance is in the cluster.
3608

3609
    """
3610
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3611
    assert self.instance is not None, \
3612
      "Cannot retrieve locked instance %s" % self.op.instance_name
3613

    
3614
  def Exec(self, feedback_fn):
3615
    """Connect to the console of an instance
3616

3617
    """
3618
    instance = self.instance
3619
    node = instance.primary_node
3620

    
3621
    node_insts = rpc.call_instance_list([node])[node]
3622
    if node_insts is False:
3623
      raise errors.OpExecError("Can't connect to node %s." % node)
3624

    
3625
    if instance.name not in node_insts:
3626
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3627

    
3628
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3629

    
3630
    hyper = hypervisor.GetHypervisor()
3631
    console_cmd = hyper.GetShellCommandForConsole(instance)
3632

    
3633
    # build ssh cmdline
3634
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3635

    
3636

    
3637
class LUReplaceDisks(LogicalUnit):
3638
  """Replace the disks of an instance.
3639

3640
  """
3641
  HPATH = "mirrors-replace"
3642
  HTYPE = constants.HTYPE_INSTANCE
3643
  _OP_REQP = ["instance_name", "mode", "disks"]
3644
  REQ_BGL = False
3645

    
3646
  def ExpandNames(self):
3647
    self._ExpandAndLockInstance()
3648

    
3649
    if not hasattr(self.op, "remote_node"):
3650
      self.op.remote_node = None
3651

    
3652
    ia_name = getattr(self.op, "iallocator", None)
3653
    if ia_name is not None:
3654
      if self.op.remote_node is not None:
3655
        raise errors.OpPrereqError("Give either the iallocator or the new"
3656
                                   " secondary, not both")
3657
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3658
    elif self.op.remote_node is not None:
3659
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3660
      if remote_node is None:
3661
        raise errors.OpPrereqError("Node '%s' not known" %
3662
                                   self.op.remote_node)
3663
      self.op.remote_node = remote_node
3664
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3665
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3666
    else:
3667
      self.needed_locks[locking.LEVEL_NODE] = []
3668
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3669

    
3670
  def DeclareLocks(self, level):
3671
    # If we're not already locking all nodes in the set we have to declare the
3672
    # instance's primary/secondary nodes.
3673
    if (level == locking.LEVEL_NODE and
3674
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3675
      self._LockInstancesNodes()
3676

    
3677
  def _RunAllocator(self):
3678
    """Compute a new secondary node using an IAllocator.
3679

3680
    """
3681
    ial = IAllocator(self.cfg, self.sstore,
3682
                     mode=constants.IALLOCATOR_MODE_RELOC,
3683
                     name=self.op.instance_name,
3684
                     relocate_from=[self.sec_node])
3685

    
3686
    ial.Run(self.op.iallocator)
3687

    
3688
    if not ial.success:
3689
      raise errors.OpPrereqError("Can't compute nodes using"
3690
                                 " iallocator '%s': %s" % (self.op.iallocator,
3691
                                                           ial.info))
3692
    if len(ial.nodes) != ial.required_nodes:
3693
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3694
                                 " of nodes (%s), required %s" %
3695
                                 (len(ial.nodes), ial.required_nodes))
3696
    self.op.remote_node = ial.nodes[0]
3697
    logger.ToStdout("Selected new secondary for the instance: %s" %
3698
                    self.op.remote_node)
3699

    
3700
  def BuildHooksEnv(self):
3701
    """Build hooks env.
3702

3703
    This runs on the master, the primary and all the secondaries.
3704

3705
    """
3706
    env = {
3707
      "MODE": self.op.mode,
3708
      "NEW_SECONDARY": self.op.remote_node,
3709
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3710
      }
3711
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3712
    nl = [
3713
      self.sstore.GetMasterNode(),
3714
      self.instance.primary_node,
3715
      ]
3716
    if self.op.remote_node is not None:
3717
      nl.append(self.op.remote_node)
3718
    return env, nl, nl
3719

    
3720
  def CheckPrereq(self):
3721
    """Check prerequisites.
3722

3723
    This checks that the instance is in the cluster.
3724

3725
    """
3726
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3727
    assert instance is not None, \
3728
      "Cannot retrieve locked instance %s" % self.op.instance_name
3729
    self.instance = instance
3730

    
3731
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3732
      raise errors.OpPrereqError("Instance's disk layout is not"
3733
                                 " network mirrored.")
3734

    
3735
    if len(instance.secondary_nodes) != 1:
3736
      raise errors.OpPrereqError("The instance has a strange layout,"
3737
                                 " expected one secondary but found %d" %
3738
                                 len(instance.secondary_nodes))
3739

    
3740
    self.sec_node = instance.secondary_nodes[0]
3741

    
3742
    ia_name = getattr(self.op, "iallocator", None)
3743
    if ia_name is not None:
3744
      self._RunAllocator()
3745

    
3746
    remote_node = self.op.remote_node
3747
    if remote_node is not None:
3748
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3749
      assert self.remote_node_info is not None, \
3750
        "Cannot retrieve locked node %s" % remote_node
3751
    else:
3752
      self.remote_node_info = None
3753
    if remote_node == instance.primary_node:
3754
      raise errors.OpPrereqError("The specified node is the primary node of"
3755
                                 " the instance.")
3756
    elif remote_node == self.sec_node:
3757
      if self.op.mode == constants.REPLACE_DISK_SEC:
3758
        # this is for DRBD8, where we can't execute the same mode of
3759
        # replacement as for drbd7 (no different port allocated)
3760
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3761
                                   " replacement")
3762
    if instance.disk_template == constants.DT_DRBD8:
3763
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3764
          remote_node is not None):
3765
        # switch to replace secondary mode
3766
        self.op.mode = constants.REPLACE_DISK_SEC
3767

    
3768
      if self.op.mode == constants.REPLACE_DISK_ALL:
3769
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3770
                                   " secondary disk replacement, not"
3771
                                   " both at once")
3772
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3773
        if remote_node is not None:
3774
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3775
                                     " the secondary while doing a primary"
3776
                                     " node disk replacement")
3777
        self.tgt_node = instance.primary_node
3778
        self.oth_node = instance.secondary_nodes[0]
3779
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3780
        self.new_node = remote_node # this can be None, in which case
3781
                                    # we don't change the secondary
3782
        self.tgt_node = instance.secondary_nodes[0]
3783
        self.oth_node = instance.primary_node
3784
      else:
3785
        raise errors.ProgrammerError("Unhandled disk replace mode")
3786

    
3787
    for name in self.op.disks:
3788
      if instance.FindDisk(name) is None:
3789
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3790
                                   (name, instance.name))
3791

    
3792
  def _ExecD8DiskOnly(self, feedback_fn):
3793
    """Replace a disk on the primary or secondary for dbrd8.
3794

3795
    The algorithm for replace is quite complicated:
3796
      - for each disk to be replaced:
3797
        - create new LVs on the target node with unique names
3798
        - detach old LVs from the drbd device
3799
        - rename old LVs to name_replaced.<time_t>
3800
        - rename new LVs to old LVs
3801
        - attach the new LVs (with the old names now) to the drbd device
3802
      - wait for sync across all devices
3803
      - for each modified disk:
3804
        - remove old LVs (which have the name name_replaces.<time_t>)
3805

3806
    Failures are not very well handled.
3807

3808
    """
3809
    steps_total = 6
3810
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3811
    instance = self.instance
3812
    iv_names = {}
3813
    vgname = self.cfg.GetVGName()
3814
    # start of work
3815
    cfg = self.cfg
3816
    tgt_node = self.tgt_node
3817
    oth_node = self.oth_node
3818

    
3819
    # Step: check device activation
3820
    self.proc.LogStep(1, steps_total, "check device existence")
3821
    info("checking volume groups")
3822
    my_vg = cfg.GetVGName()
3823
    results = rpc.call_vg_list([oth_node, tgt_node])
3824
    if not results:
3825
      raise errors.OpExecError("Can't list volume groups on the nodes")
3826
    for node in oth_node, tgt_node:
3827
      res = results.get(node, False)
3828
      if not res or my_vg not in res:
3829
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3830
                                 (my_vg, node))
3831
    for dev in instance.disks:
3832
      if not dev.iv_name in self.op.disks:
3833
        continue
3834
      for node in tgt_node, oth_node:
3835
        info("checking %s on %s" % (dev.iv_name, node))
3836
        cfg.SetDiskID(dev, node)
3837
        if not rpc.call_blockdev_find(node, dev):
3838
          raise errors.OpExecError("Can't find device %s on node %s" %
3839
                                   (dev.iv_name, node))
3840

    
3841
    # Step: check other node consistency
3842
    self.proc.LogStep(2, steps_total, "check peer consistency")
3843
    for dev in instance.disks:
3844
      if not dev.iv_name in self.op.disks:
3845
        continue
3846
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3847
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3848
                                   oth_node==instance.primary_node):
3849
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3850
                                 " to replace disks on this node (%s)" %
3851
                                 (oth_node, tgt_node))
3852

    
3853
    # Step: create new storage
3854
    self.proc.LogStep(3, steps_total, "allocate new storage")
3855
    for dev in instance.disks:
3856
      if not dev.iv_name in self.op.disks:
3857
        continue
3858
      size = dev.size
3859
      cfg.SetDiskID(dev, tgt_node)
3860
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3861
      names = _GenerateUniqueNames(cfg, lv_names)
3862
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3863
                             logical_id=(vgname, names[0]))
3864
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3865
                             logical_id=(vgname, names[1]))
3866
      new_lvs = [lv_data, lv_meta]
3867
      old_lvs = dev.children
3868
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3869
      info("creating new local storage on %s for %s" %
3870
           (tgt_node, dev.iv_name))
3871
      # since we *always* want to create this LV, we use the
3872
      # _Create...OnPrimary (which forces the creation), even if we
3873
      # are talking about the secondary node
3874
      for new_lv in new_lvs:
3875
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3876
                                        _GetInstanceInfoText(instance)):
3877
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3878
                                   " node '%s'" %
3879
                                   (new_lv.logical_id[1], tgt_node))
3880

    
3881
    # Step: for each lv, detach+rename*2+attach
3882
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3883
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3884
      info("detaching %s drbd from local storage" % dev.iv_name)
3885
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3886
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3887
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3888
      #dev.children = []
3889
      #cfg.Update(instance)
3890

    
3891
      # ok, we created the new LVs, so now we know we have the needed
3892
      # storage; as such, we proceed on the target node to rename
3893
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3894
      # using the assumption that logical_id == physical_id (which in
3895
      # turn is the unique_id on that node)
3896

    
3897
      # FIXME(iustin): use a better name for the replaced LVs
3898
      temp_suffix = int(time.time())
3899
      ren_fn = lambda d, suff: (d.physical_id[0],
3900
                                d.physical_id[1] + "_replaced-%s" % suff)
3901
      # build the rename list based on what LVs exist on the node
3902
      rlist = []
3903
      for to_ren in old_lvs:
3904
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3905
        if find_res is not None: # device exists
3906
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3907

    
3908
      info("renaming the old LVs on the target node")
3909
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3910
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3911
      # now we rename the new LVs to the old LVs
3912
      info("renaming the new LVs on the target node")
3913
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3914
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3915
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3916

    
3917
      for old, new in zip(old_lvs, new_lvs):
3918
        new.logical_id = old.logical_id
3919
        cfg.SetDiskID(new, tgt_node)
3920

    
3921
      for disk in old_lvs:
3922
        disk.logical_id = ren_fn(disk, temp_suffix)
3923
        cfg.SetDiskID(disk, tgt_node)
3924

    
3925
      # now that the new lvs have the old name, we can add them to the device
3926
      info("adding new mirror component on %s" % tgt_node)
3927
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3928
        for new_lv in new_lvs:
3929
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3930
            warning("Can't rollback device %s", hint="manually cleanup unused"
3931
                    " logical volumes")
3932
        raise errors.OpExecError("Can't add local storage to drbd")
3933

    
3934
      dev.children = new_lvs
3935
      cfg.Update(instance)
3936

    
3937
    # Step: wait for sync
3938

    
3939
    # this can fail as the old devices are degraded and _WaitForSync
3940
    # does a combined result over all disks, so we don't check its
3941
    # return value
3942
    self.proc.LogStep(5, steps_total, "sync devices")
3943
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3944

    
3945
    # so check manually all the devices
3946
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3947
      cfg.SetDiskID(dev, instance.primary_node)
3948
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3949
      if is_degr:
3950
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3951

    
3952
    # Step: remove old storage
3953
    self.proc.LogStep(6, steps_total, "removing old storage")
3954
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3955
      info("remove logical volumes for %s" % name)
3956
      for lv in old_lvs:
3957
        cfg.SetDiskID(lv, tgt_node)
3958
        if not rpc.call_blockdev_remove(tgt_node, lv):
3959
          warning("Can't remove old LV", hint="manually remove unused LVs")
3960
          continue
3961

    
3962
  def _ExecD8Secondary(self, feedback_fn):
3963
    """Replace the secondary node for drbd8.
3964

3965
    The algorithm for replace is quite complicated:
3966
      - for all disks of the instance:
3967
        - create new LVs on the new node with same names
3968
        - shutdown the drbd device on the old secondary
3969
        - disconnect the drbd network on the primary
3970
        - create the drbd device on the new secondary
3971
        - network attach the drbd on the primary, using an artifice:
3972
          the drbd code for Attach() will connect to the network if it
3973
          finds a device which is connected to the good local disks but
3974
          not network enabled
3975
      - wait for sync across all devices
3976
      - remove all disks from the old secondary
3977

3978
    Failures are not very well handled.
3979

3980
    """
3981
    steps_total = 6
3982
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3983
    instance = self.instance
3984
    iv_names = {}
3985
    vgname = self.cfg.GetVGName()
3986
    # start of work
3987
    cfg = self.cfg
3988
    old_node = self.tgt_node
3989
    new_node = self.new_node
3990
    pri_node = instance.primary_node
3991

    
3992
    # Step: check device activation
3993
    self.proc.LogStep(1, steps_total, "check device existence")
3994
    info("checking volume groups")
3995
    my_vg = cfg.GetVGName()
3996
    results = rpc.call_vg_list([pri_node, new_node])
3997
    if not results:
3998
      raise errors.OpExecError("Can't list volume groups on the nodes")
3999
    for node in pri_node, new_node:
4000
      res = results.get(node, False)
4001
      if not res or my_vg not in res:
4002
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4003
                                 (my_vg, node))
4004
    for dev in instance.disks:
4005
      if not dev.iv_name in self.op.disks:
4006
        continue
4007
      info("checking %s on %s" % (dev.iv_name, pri_node))
4008
      cfg.SetDiskID(dev, pri_node)
4009
      if not rpc.call_blockdev_find(pri_node, dev):
4010
        raise errors.OpExecError("Can't find device %s on node %s" %
4011
                                 (dev.iv_name, pri_node))
4012

    
4013
    # Step: check other node consistency
4014
    self.proc.LogStep(2, steps_total, "check peer consistency")
4015
    for dev in instance.disks:
4016
      if not dev.iv_name in self.op.disks:
4017
        continue
4018
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4019
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4020
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4021
                                 " unsafe to replace the secondary" %
4022
                                 pri_node)
4023

    
4024
    # Step: create new storage
4025
    self.proc.LogStep(3, steps_total, "allocate new storage")
4026
    for dev in instance.disks:
4027
      size = dev.size
4028
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4029
      # since we *always* want to create this LV, we use the
4030
      # _Create...OnPrimary (which forces the creation), even if we
4031
      # are talking about the secondary node
4032
      for new_lv in dev.children:
4033
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4034
                                        _GetInstanceInfoText(instance)):
4035
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4036
                                   " node '%s'" %
4037
                                   (new_lv.logical_id[1], new_node))
4038

    
4039

    
4040
    # Step 4: dbrd minors and drbd setups changes
4041
    # after this, we must manually remove the drbd minors on both the
4042
    # error and the success paths
4043
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4044
                                   instance.name)
4045
    logging.debug("Allocated minors %s" % (minors,))
4046
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4047
    for dev, new_minor in zip(instance.disks, minors):
4048
      size = dev.size
4049
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4050
      # create new devices on new_node
4051
      if pri_node == dev.logical_id[0]:
4052
        new_logical_id = (pri_node, new_node,
4053
                          dev.logical_id[2], dev.logical_id[3], new_minor)
4054
      else:
4055
        new_logical_id = (new_node, pri_node,
4056
                          dev.logical_id[2], new_minor, dev.logical_id[4])
4057
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4058
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4059
                    new_logical_id)
4060
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4061
                              logical_id=new_logical_id,
4062
                              children=dev.children)
4063
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4064
                                        new_drbd, False,
4065
                                      _GetInstanceInfoText(instance)):
4066
        self.cfg.ReleaseDRBDMinors(instance.name)
4067
        raise errors.OpExecError("Failed to create new DRBD on"
4068
                                 " node '%s'" % new_node)
4069

    
4070
    for dev in instance.disks:
4071
      # we have new devices, shutdown the drbd on the old secondary
4072
      info("shutting down drbd for %s on old node" % dev.iv_name)
4073
      cfg.SetDiskID(dev, old_node)
4074
      if not rpc.call_blockdev_shutdown(old_node, dev):
4075
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4076
                hint="Please cleanup this device manually as soon as possible")
4077

    
4078
    info("detaching primary drbds from the network (=> standalone)")
4079
    done = 0
4080
    for dev in instance.disks:
4081
      cfg.SetDiskID(dev, pri_node)
4082
      # set the physical (unique in bdev terms) id to None, meaning
4083
      # detach from network
4084
      dev.physical_id = (None, None, None, None, dev.physical_id[4])
4085
      # and 'find' the device, which will 'fix' it to match the
4086
      # standalone state
4087
      if rpc.call_blockdev_find(pri_node, dev):
4088
        done += 1
4089
      else:
4090
        warning("Failed to detach drbd %s from network, unusual case" %
4091
                dev.iv_name)
4092

    
4093
    if not done:
4094
      # no detaches succeeded (very unlikely)
4095
      self.cfg.ReleaseDRBDMinors(instance.name)
4096
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4097

    
4098
    # if we managed to detach at least one, we update all the disks of
4099
    # the instance to point to the new secondary
4100
    info("updating instance configuration")
4101
    for dev, _, new_logical_id in iv_names.itervalues():
4102
      dev.logical_id = new_logical_id
4103
      cfg.SetDiskID(dev, pri_node)
4104
    cfg.Update(instance)
4105
    # we can remove now the temp minors as now the new values are
4106
    # written to the config file (and therefore stable)
4107
    self.cfg.ReleaseDRBDMinors(instance.name)
4108

    
4109
    # and now perform the drbd attach
4110
    info("attaching primary drbds to new secondary (standalone => connected)")
4111
    failures = []
4112
    for dev in instance.disks:
4113
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4114
      # since the attach is smart, it's enough to 'find' the device,
4115
      # it will automatically activate the network, if the physical_id
4116
      # is correct
4117
      cfg.SetDiskID(dev, pri_node)
4118
      logging.debug("Disk to attach: %s", dev)
4119
      if not rpc.call_blockdev_find(pri_node, dev):
4120
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4121
                "please do a gnt-instance info to see the status of disks")
4122

    
4123
    # this can fail as the old devices are degraded and _WaitForSync
4124
    # does a combined result over all disks, so we don't check its
4125
    # return value
4126
    self.proc.LogStep(5, steps_total, "sync devices")
4127
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4128

    
4129
    # so check manually all the devices
4130
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4131
      cfg.SetDiskID(dev, pri_node)
4132
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4133
      if is_degr:
4134
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4135

    
4136
    self.proc.LogStep(6, steps_total, "removing old storage")
4137
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4138
      info("remove logical volumes for %s" % name)
4139
      for lv in old_lvs:
4140
        cfg.SetDiskID(lv, old_node)
4141
        if not rpc.call_blockdev_remove(old_node, lv):
4142
          warning("Can't remove LV on old secondary",
4143
                  hint="Cleanup stale volumes by hand")
4144

    
4145
  def Exec(self, feedback_fn):
4146
    """Execute disk replacement.
4147

4148
    This dispatches the disk replacement to the appropriate handler.
4149

4150
    """
4151
    instance = self.instance
4152

    
4153
    # Activate the instance disks if we're replacing them on a down instance
4154
    if instance.status == "down":
4155
      _StartInstanceDisks(self.cfg, instance, True)
4156

    
4157
    if instance.disk_template == constants.DT_DRBD8:
4158
      if self.op.remote_node is None:
4159
        fn = self._ExecD8DiskOnly
4160
      else:
4161
        fn = self._ExecD8Secondary
4162
    else:
4163
      raise errors.ProgrammerError("Unhandled disk replacement case")
4164

    
4165
    ret = fn(feedback_fn)
4166

    
4167
    # Deactivate the instance disks if we're replacing them on a down instance
4168
    if instance.status == "down":
4169
      _SafeShutdownInstanceDisks(instance, self.cfg)
4170

    
4171
    return ret
4172

    
4173

    
4174
class LUGrowDisk(LogicalUnit):
4175
  """Grow a disk of an instance.
4176

4177
  """
4178
  HPATH = "disk-grow"
4179
  HTYPE = constants.HTYPE_INSTANCE
4180
  _OP_REQP = ["instance_name", "disk", "amount"]
4181
  REQ_BGL = False
4182

    
4183
  def ExpandNames(self):
4184
    self._ExpandAndLockInstance()
4185
    self.needed_locks[locking.LEVEL_NODE] = []
4186
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4187

    
4188
  def DeclareLocks(self, level):
4189
    if level == locking.LEVEL_NODE:
4190
      self._LockInstancesNodes()
4191

    
4192
  def BuildHooksEnv(self):
4193
    """Build hooks env.
4194

4195
    This runs on the master, the primary and all the secondaries.
4196

4197
    """
4198
    env = {
4199
      "DISK": self.op.disk,
4200
      "AMOUNT": self.op.amount,
4201
      }
4202
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4203
    nl = [
4204
      self.sstore.GetMasterNode(),
4205
      self.instance.primary_node,
4206
      ]
4207
    return env, nl, nl
4208

    
4209
  def CheckPrereq(self):
4210
    """Check prerequisites.
4211

4212
    This checks that the instance is in the cluster.
4213

4214
    """
4215
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4216
    assert instance is not None, \
4217
      "Cannot retrieve locked instance %s" % self.op.instance_name
4218

    
4219
    self.instance = instance
4220

    
4221
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4222
      raise errors.OpPrereqError("Instance's disk layout does not support"
4223
                                 " growing.")
4224

    
4225
    if instance.FindDisk(self.op.disk) is None:
4226
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4227
                                 (self.op.disk, instance.name))
4228

    
4229
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4230
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4231
    for node in nodenames:
4232
      info = nodeinfo.get(node, None)
4233
      if not info:
4234
        raise errors.OpPrereqError("Cannot get current information"
4235
                                   " from node '%s'" % node)
4236
      vg_free = info.get('vg_free', None)
4237
      if not isinstance(vg_free, int):
4238
        raise errors.OpPrereqError("Can't compute free disk space on"
4239
                                   " node %s" % node)
4240
      if self.op.amount > info['vg_free']:
4241
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4242
                                   " %d MiB available, %d MiB required" %
4243
                                   (node, info['vg_free'], self.op.amount))
4244

    
4245
  def Exec(self, feedback_fn):
4246
    """Execute disk grow.
4247

4248
    """
4249
    instance = self.instance
4250
    disk = instance.FindDisk(self.op.disk)
4251
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4252
      self.cfg.SetDiskID(disk, node)
4253
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4254
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4255
        raise errors.OpExecError("grow request failed to node %s" % node)
4256
      elif not result[0]:
4257
        raise errors.OpExecError("grow request failed to node %s: %s" %
4258
                                 (node, result[1]))
4259
    disk.RecordGrow(self.op.amount)
4260
    self.cfg.Update(instance)
4261
    return
4262

    
4263

    
4264
class LUQueryInstanceData(NoHooksLU):
4265
  """Query runtime instance data.
4266

4267
  """
4268
  _OP_REQP = ["instances"]
4269
  REQ_BGL = False
4270
  def ExpandNames(self):
4271
    self.needed_locks = {}
4272
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4273

    
4274
    if not isinstance(self.op.instances, list):
4275
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4276

    
4277
    if self.op.instances:
4278
      self.wanted_names = []
4279
      for name in self.op.instances:
4280
        full_name = self.cfg.ExpandInstanceName(name)
4281
        if full_name is None:
4282
          raise errors.OpPrereqError("Instance '%s' not known" %
4283
                                     self.op.instance_name)
4284
        self.wanted_names.append(full_name)
4285
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4286
    else:
4287
      self.wanted_names = None
4288
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4289

    
4290
    self.needed_locks[locking.LEVEL_NODE] = []
4291
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4292

    
4293
  def DeclareLocks(self, level):
4294
    if level == locking.LEVEL_NODE:
4295
      self._LockInstancesNodes()
4296

    
4297
  def CheckPrereq(self):
4298
    """Check prerequisites.
4299

4300
    This only checks the optional instance list against the existing names.
4301

4302
    """
4303
    if self.wanted_names is None:
4304
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4305

    
4306
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4307
                             in self.wanted_names]
4308
    return
4309

    
4310
  def _ComputeDiskStatus(self, instance, snode, dev):
4311
    """Compute block device status.
4312

4313
    """
4314
    self.cfg.SetDiskID(dev, instance.primary_node)
4315
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4316
    if dev.dev_type in constants.LDS_DRBD:
4317
      # we change the snode then (otherwise we use the one passed in)
4318
      if dev.logical_id[0] == instance.primary_node:
4319
        snode = dev.logical_id[1]
4320
      else:
4321
        snode = dev.logical_id[0]
4322

    
4323
    if snode:
4324
      self.cfg.SetDiskID(dev, snode)
4325
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4326
    else:
4327
      dev_sstatus = None
4328

    
4329
    if dev.children:
4330
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4331
                      for child in dev.children]
4332
    else:
4333
      dev_children = []
4334

    
4335
    data = {
4336
      "iv_name": dev.iv_name,
4337
      "dev_type": dev.dev_type,
4338
      "logical_id": dev.logical_id,
4339
      "physical_id": dev.physical_id,
4340
      "pstatus": dev_pstatus,
4341
      "sstatus": dev_sstatus,
4342
      "children": dev_children,
4343
      }
4344

    
4345
    return data
4346

    
4347
  def Exec(self, feedback_fn):
4348
    """Gather and return data"""
4349
    result = {}
4350
    for instance in self.wanted_instances:
4351
      remote_info = rpc.call_instance_info(instance.primary_node,
4352
                                                instance.name)
4353
      if remote_info and "state" in remote_info:
4354
        remote_state = "up"
4355
      else:
4356
        remote_state = "down"
4357
      if instance.status == "down":
4358
        config_state = "down"
4359
      else:
4360
        config_state = "up"
4361

    
4362
      disks = [self._ComputeDiskStatus(instance, None, device)
4363
               for device in instance.disks]
4364

    
4365
      idict = {
4366
        "name": instance.name,
4367
        "config_state": config_state,
4368
        "run_state": remote_state,
4369
        "pnode": instance.primary_node,
4370
        "snodes": instance.secondary_nodes,
4371
        "os": instance.os,
4372
        "memory": instance.memory,
4373
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4374
        "disks": disks,
4375
        "vcpus": instance.vcpus,
4376
        }
4377

    
4378
      htkind = self.sstore.GetHypervisorType()
4379
      if htkind == constants.HT_XEN_PVM30:
4380
        idict["kernel_path"] = instance.kernel_path
4381
        idict["initrd_path"] = instance.initrd_path
4382

    
4383
      if htkind == constants.HT_XEN_HVM31:
4384
        idict["hvm_boot_order"] = instance.hvm_boot_order
4385
        idict["hvm_acpi"] = instance.hvm_acpi
4386
        idict["hvm_pae"] = instance.hvm_pae
4387
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4388
        idict["hvm_nic_type"] = instance.hvm_nic_type
4389
        idict["hvm_disk_type"] = instance.hvm_disk_type
4390

    
4391
      if htkind in constants.HTS_REQ_PORT:
4392
        if instance.vnc_bind_address is None:
4393
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4394
        else:
4395
          vnc_bind_address = instance.vnc_bind_address
4396
        if instance.network_port is None:
4397
          vnc_console_port = None
4398
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4399
          vnc_console_port = "%s:%s" % (instance.primary_node,
4400
                                       instance.network_port)
4401
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4402
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4403
                                                   instance.network_port,
4404
                                                   instance.primary_node)
4405
        else:
4406
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4407
                                        instance.network_port)
4408
        idict["vnc_console_port"] = vnc_console_port
4409
        idict["vnc_bind_address"] = vnc_bind_address
4410
        idict["network_port"] = instance.network_port
4411

    
4412
      result[instance.name] = idict
4413

    
4414
    return result
4415

    
4416

    
4417
class LUSetInstanceParams(LogicalUnit):
4418
  """Modifies an instances's parameters.
4419

4420
  """
4421
  HPATH = "instance-modify"
4422
  HTYPE = constants.HTYPE_INSTANCE
4423
  _OP_REQP = ["instance_name"]
4424
  REQ_BGL = False
4425

    
4426
  def ExpandNames(self):
4427
    self._ExpandAndLockInstance()
4428

    
4429
  def BuildHooksEnv(self):
4430
    """Build hooks env.
4431

4432
    This runs on the master, primary and secondaries.
4433

4434
    """
4435
    args = dict()
4436
    if self.mem:
4437
      args['memory'] = self.mem
4438
    if self.vcpus:
4439
      args['vcpus'] = self.vcpus
4440
    if self.do_ip or self.do_bridge or self.mac:
4441
      if self.do_ip:
4442
        ip = self.ip
4443
      else:
4444
        ip = self.instance.nics[0].ip
4445
      if self.bridge:
4446
        bridge = self.bridge
4447
      else:
4448
        bridge = self.instance.nics[0].bridge
4449
      if self.mac:
4450
        mac = self.mac
4451
      else:
4452
        mac = self.instance.nics[0].mac
4453
      args['nics'] = [(ip, bridge, mac)]
4454
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4455
    nl = [self.sstore.GetMasterNode(),
4456
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4457
    return env, nl, nl
4458

    
4459
  def CheckPrereq(self):
4460
    """Check prerequisites.
4461

4462
    This only checks the instance list against the existing names.
4463

4464
    """
4465
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4466
    # a separate CheckArguments function, if we implement one, so the operation
4467
    # can be aborted without waiting for any lock, should it have an error...
4468
    self.mem = getattr(self.op, "mem", None)
4469
    self.vcpus = getattr(self.op, "vcpus", None)
4470
    self.ip = getattr(self.op, "ip", None)
4471
    self.mac = getattr(self.op, "mac", None)
4472
    self.bridge = getattr(self.op, "bridge", None)
4473
    self.kernel_path = getattr(self.op, "kernel_path", None)
4474
    self.initrd_path = getattr(self.op, "initrd_path", None)
4475
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4476
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4477
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4478
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4479
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4480
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4481
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4482
    self.force = getattr(self.op, "force", None)
4483
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4484
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4485
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4486
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4487
    if all_parms.count(None) == len(all_parms):
4488
      raise errors.OpPrereqError("No changes submitted")
4489
    if self.mem is not None:
4490
      try:
4491
        self.mem = int(self.mem)
4492
      except ValueError, err:
4493
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4494
    if self.vcpus is not None:
4495
      try:
4496
        self.vcpus = int(self.vcpus)
4497
      except ValueError, err:
4498
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4499
    if self.ip is not None:
4500
      self.do_ip = True
4501
      if self.ip.lower() == "none":
4502
        self.ip = None
4503
      else:
4504
        if not utils.IsValidIP(self.ip):
4505
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4506
    else:
4507
      self.do_ip = False
4508
    self.do_bridge = (self.bridge is not None)
4509
    if self.mac is not None:
4510
      if self.cfg.IsMacInUse(self.mac):
4511
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4512
                                   self.mac)
4513
      if not utils.IsValidMac(self.mac):
4514
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4515

    
4516
    if self.kernel_path is not None:
4517
      self.do_kernel_path = True
4518
      if self.kernel_path == constants.VALUE_NONE:
4519
        raise errors.OpPrereqError("Can't set instance to no kernel")
4520

    
4521
      if self.kernel_path != constants.VALUE_DEFAULT:
4522
        if not os.path.isabs(self.kernel_path):
4523
          raise errors.OpPrereqError("The kernel path must be an absolute"
4524
                                    " filename")
4525
    else:
4526
      self.do_kernel_path = False
4527

    
4528
    if self.initrd_path is not None:
4529
      self.do_initrd_path = True
4530
      if self.initrd_path not in (constants.VALUE_NONE,
4531
                                  constants.VALUE_DEFAULT):
4532
        if not os.path.isabs(self.initrd_path):
4533
          raise errors.OpPrereqError("The initrd path must be an absolute"
4534
                                    " filename")
4535
    else:
4536
      self.do_initrd_path = False
4537

    
4538
    # boot order verification
4539
    if self.hvm_boot_order is not None:
4540
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4541
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4542
          raise errors.OpPrereqError("invalid boot order specified,"
4543
                                     " must be one or more of [acdn]"
4544
                                     " or 'default'")
4545

    
4546
    # hvm_cdrom_image_path verification
4547
    if self.op.hvm_cdrom_image_path is not None:
4548
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4549
              self.op.hvm_cdrom_image_path.lower() == "none"):
4550
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4551
                                   " be an absolute path or None, not %s" %
4552
                                   self.op.hvm_cdrom_image_path)
4553
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4554
              self.op.hvm_cdrom_image_path.lower() == "none"):
4555
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4556
                                   " regular file or a symlink pointing to"
4557
                                   " an existing regular file, not %s" %
4558
                                   self.op.hvm_cdrom_image_path)
4559

    
4560
    # vnc_bind_address verification
4561
    if self.op.vnc_bind_address is not None:
4562
      if not utils.IsValidIP(self.op.vnc_bind_address):
4563
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4564
                                   " like a valid IP address" %
4565
                                   self.op.vnc_bind_address)
4566

    
4567
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4568
    assert self.instance is not None, \
4569
      "Cannot retrieve locked instance %s" % self.op.instance_name
4570
    self.warn = []
4571
    if self.mem is not None and not self.force:
4572
      pnode = self.instance.primary_node
4573
      nodelist = [pnode]
4574
      nodelist.extend(instance.secondary_nodes)
4575
      instance_info = rpc.call_instance_info(pnode, instance.name)
4576
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4577

    
4578
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4579
        # Assume the primary node is unreachable and go ahead
4580
        self.warn.append("Can't get info from primary node %s" % pnode)
4581
      else:
4582
        if instance_info:
4583
          current_mem = instance_info['memory']
4584
        else:
4585
          # Assume instance not running
4586
          # (there is a slight race condition here, but it's not very probable,
4587
          # and we have no other way to check)
4588
          current_mem = 0
4589
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4590
        if miss_mem > 0:
4591
          raise errors.OpPrereqError("This change will prevent the instance"
4592
                                     " from starting, due to %d MB of memory"
4593
                                     " missing on its primary node" % miss_mem)
4594

    
4595
      for node in instance.secondary_nodes:
4596
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4597
          self.warn.append("Can't get info from secondary node %s" % node)
4598
        elif self.mem > nodeinfo[node]['memory_free']:
4599
          self.warn.append("Not enough memory to failover instance to secondary"
4600
                           " node %s" % node)
4601

    
4602
    # Xen HVM device type checks
4603
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4604
      if self.op.hvm_nic_type is not None:
4605
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4606
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4607
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4608
      if self.op.hvm_disk_type is not None:
4609
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4610
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4611
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4612

    
4613
    return
4614

    
4615
  def Exec(self, feedback_fn):
4616
    """Modifies an instance.
4617

4618
    All parameters take effect only at the next restart of the instance.
4619
    """
4620
    # Process here the warnings from CheckPrereq, as we don't have a
4621
    # feedback_fn there.
4622
    for warn in self.warn:
4623
      feedback_fn("WARNING: %s" % warn)
4624

    
4625
    result = []
4626
    instance = self.instance
4627
    if self.mem:
4628
      instance.memory = self.mem
4629
      result.append(("mem", self.mem))
4630
    if self.vcpus:
4631
      instance.vcpus = self.vcpus
4632
      result.append(("vcpus",  self.vcpus))
4633
    if self.do_ip:
4634
      instance.nics[0].ip = self.ip
4635
      result.append(("ip", self.ip))
4636
    if self.bridge:
4637
      instance.nics[0].bridge = self.bridge
4638
      result.append(("bridge", self.bridge))
4639
    if self.mac:
4640
      instance.nics[0].mac = self.mac
4641
      result.append(("mac", self.mac))
4642
    if self.do_kernel_path:
4643
      instance.kernel_path = self.kernel_path
4644
      result.append(("kernel_path", self.kernel_path))
4645
    if self.do_initrd_path:
4646
      instance.initrd_path = self.initrd_path
4647
      result.append(("initrd_path", self.initrd_path))
4648
    if self.hvm_boot_order:
4649
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4650
        instance.hvm_boot_order = None
4651
      else:
4652
        instance.hvm_boot_order = self.hvm_boot_order
4653
      result.append(("hvm_boot_order", self.hvm_boot_order))
4654
    if self.hvm_acpi is not None:
4655
      instance.hvm_acpi = self.hvm_acpi
4656
      result.append(("hvm_acpi", self.hvm_acpi))
4657
    if self.hvm_pae is not None:
4658
      instance.hvm_pae = self.hvm_pae
4659
      result.append(("hvm_pae", self.hvm_pae))
4660
    if self.hvm_nic_type is not None:
4661
      instance.hvm_nic_type = self.hvm_nic_type
4662
      result.append(("hvm_nic_type", self.hvm_nic_type))
4663
    if self.hvm_disk_type is not None:
4664
      instance.hvm_disk_type = self.hvm_disk_type
4665
      result.append(("hvm_disk_type", self.hvm_disk_type))
4666
    if self.hvm_cdrom_image_path:
4667
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4668
        instance.hvm_cdrom_image_path = None
4669
      else:
4670
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4671
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4672
    if self.vnc_bind_address:
4673
      instance.vnc_bind_address = self.vnc_bind_address
4674
      result.append(("vnc_bind_address", self.vnc_bind_address))
4675

    
4676
    self.cfg.Update(instance)
4677

    
4678
    return result
4679

    
4680

    
4681
class LUQueryExports(NoHooksLU):
4682
  """Query the exports list
4683

4684
  """
4685
  _OP_REQP = ['nodes']
4686
  REQ_BGL = False
4687

    
4688
  def ExpandNames(self):
4689
    self.needed_locks = {}
4690
    self.share_locks[locking.LEVEL_NODE] = 1
4691
    if not self.op.nodes:
4692
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4693
    else:
4694
      self.needed_locks[locking.LEVEL_NODE] = \
4695
        _GetWantedNodes(self, self.op.nodes)
4696

    
4697
  def CheckPrereq(self):
4698
    """Check prerequisites.
4699

4700
    """
4701
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4702

    
4703
  def Exec(self, feedback_fn):
4704
    """Compute the list of all the exported system images.
4705

4706
    Returns:
4707
      a dictionary with the structure node->(export-list)
4708
      where export-list is a list of the instances exported on
4709
      that node.
4710

4711
    """
4712
    return rpc.call_export_list(self.nodes)
4713

    
4714

    
4715
class LUExportInstance(LogicalUnit):
4716
  """Export an instance to an image in the cluster.
4717

4718
  """
4719
  HPATH = "instance-export"
4720
  HTYPE = constants.HTYPE_INSTANCE
4721
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4722
  REQ_BGL = False
4723

    
4724
  def ExpandNames(self):
4725
    self._ExpandAndLockInstance()
4726
    # FIXME: lock only instance primary and destination node
4727
    #
4728
    # Sad but true, for now we have do lock all nodes, as we don't know where
4729
    # the previous export might be, and and in this LU we search for it and
4730
    # remove it from its current node. In the future we could fix this by:
4731
    #  - making a tasklet to search (share-lock all), then create the new one,
4732
    #    then one to remove, after
4733
    #  - removing the removal operation altoghether
4734
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4735

    
4736
  def DeclareLocks(self, level):
4737
    """Last minute lock declaration."""
4738
    # All nodes are locked anyway, so nothing to do here.
4739

    
4740
  def BuildHooksEnv(self):
4741
    """Build hooks env.
4742

4743
    This will run on the master, primary node and target node.
4744

4745
    """
4746
    env = {
4747
      "EXPORT_NODE": self.op.target_node,
4748
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4749
      }
4750
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4751
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4752
          self.op.target_node]
4753
    return env, nl, nl
4754

    
4755
  def CheckPrereq(self):
4756
    """Check prerequisites.
4757

4758
    This checks that the instance and node names are valid.
4759

4760
    """
4761
    instance_name = self.op.instance_name
4762
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4763
    assert self.instance is not None, \
4764
          "Cannot retrieve locked instance %s" % self.op.instance_name
4765

    
4766
    self.dst_node = self.cfg.GetNodeInfo(
4767
      self.cfg.ExpandNodeName(self.op.target_node))
4768

    
4769
    assert self.dst_node is not None, \
4770
          "Cannot retrieve locked node %s" % self.op.target_node
4771

    
4772
    # instance disk type verification
4773
    for disk in self.instance.disks:
4774
      if disk.dev_type == constants.LD_FILE:
4775
        raise errors.OpPrereqError("Export not supported for instances with"
4776
                                   " file-based disks")
4777

    
4778
  def Exec(self, feedback_fn):
4779
    """Export an instance to an image in the cluster.
4780

4781
    """
4782
    instance = self.instance
4783
    dst_node = self.dst_node
4784
    src_node = instance.primary_node
4785
    if self.op.shutdown:
4786
      # shutdown the instance, but not the disks
4787
      if not rpc.call_instance_shutdown(src_node, instance):
4788
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4789
                                 (instance.name, src_node))
4790

    
4791
    vgname = self.cfg.GetVGName()
4792

    
4793
    snap_disks = []
4794

    
4795
    try:
4796
      for disk in instance.disks:
4797
        if disk.iv_name == "sda":
4798
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4799
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4800

    
4801
          if not new_dev_name:
4802
            logger.Error("could not snapshot block device %s on node %s" %
4803
                         (disk.logical_id[1], src_node))
4804
          else:
4805
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4806
                                      logical_id=(vgname, new_dev_name),
4807
                                      physical_id=(vgname, new_dev_name),
4808
                                      iv_name=disk.iv_name)
4809
            snap_disks.append(new_dev)
4810

    
4811
    finally:
4812
      if self.op.shutdown and instance.status == "up":
4813
        if not rpc.call_instance_start(src_node, instance, None):
4814
          _ShutdownInstanceDisks(instance, self.cfg)
4815
          raise errors.OpExecError("Could not start instance")
4816

    
4817
    # TODO: check for size
4818

    
4819
    for dev in snap_disks:
4820
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4821
        logger.Error("could not export block device %s from node %s to node %s"
4822
                     % (dev.logical_id[1], src_node, dst_node.name))
4823
      if not rpc.call_blockdev_remove(src_node, dev):
4824
        logger.Error("could not remove snapshot block device %s from node %s" %
4825
                     (dev.logical_id[1], src_node))
4826

    
4827
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4828
      logger.Error("could not finalize export for instance %s on node %s" %
4829
                   (instance.name, dst_node.name))
4830

    
4831
    nodelist = self.cfg.GetNodeList()
4832
    nodelist.remove(dst_node.name)
4833

    
4834
    # on one-node clusters nodelist will be empty after the removal
4835
    # if we proceed the backup would be removed because OpQueryExports
4836
    # substitutes an empty list with the full cluster node list.
4837
    if nodelist:
4838
      exportlist = rpc.call_export_list(nodelist)
4839
      for node in exportlist:
4840
        if instance.name in exportlist[node]:
4841
          if not rpc.call_export_remove(node, instance.name):
4842
            logger.Error("could not remove older export for instance %s"
4843
                         " on node %s" % (instance.name, node))
4844

    
4845

    
4846
class LURemoveExport(NoHooksLU):
4847
  """Remove exports related to the named instance.
4848

4849
  """
4850
  _OP_REQP = ["instance_name"]
4851
  REQ_BGL = False
4852

    
4853
  def ExpandNames(self):
4854
    self.needed_locks = {}
4855
    # We need all nodes to be locked in order for RemoveExport to work, but we
4856
    # don't need to lock the instance itself, as nothing will happen to it (and
4857
    # we can remove exports also for a removed instance)
4858
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4859

    
4860
  def CheckPrereq(self):
4861
    """Check prerequisites.
4862
    """
4863
    pass
4864

    
4865
  def Exec(self, feedback_fn):
4866
    """Remove any export.
4867

4868
    """
4869
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4870
    # If the instance was not found we'll try with the name that was passed in.
4871
    # This will only work if it was an FQDN, though.
4872
    fqdn_warn = False
4873
    if not instance_name:
4874
      fqdn_warn = True
4875
      instance_name = self.op.instance_name
4876

    
4877
    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4878
    found = False
4879
    for node in exportlist:
4880
      if instance_name in exportlist[node]:
4881
        found = True
4882
        if not rpc.call_export_remove(node, instance_name):
4883
          logger.Error("could not remove export for instance %s"
4884
                       " on node %s" % (instance_name, node))
4885

    
4886
    if fqdn_warn and not found:
4887
      feedback_fn("Export not found. If trying to remove an export belonging"
4888
                  " to a deleted instance please use its Fully Qualified"
4889
                  " Domain Name.")
4890

    
4891

    
4892
class TagsLU(NoHooksLU):
4893
  """Generic tags LU.
4894

4895
  This is an abstract class which is the parent of all the other tags LUs.
4896

4897
  """
4898

    
4899
  def ExpandNames(self):
4900
    self.needed_locks = {}
4901
    if self.op.kind == constants.TAG_NODE:
4902
      name = self.cfg.ExpandNodeName(self.op.name)
4903
      if name is None:
4904
        raise errors.OpPrereqError("Invalid node name (%s)" %
4905
                                   (self.op.name,))
4906
      self.op.name = name
4907
      self.needed_locks[locking.LEVEL_NODE] = name
4908
    elif self.op.kind == constants.TAG_INSTANCE:
4909
      name = self.cfg.ExpandInstanceName(self.op.name)
4910
      if name is None:
4911
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4912
                                   (self.op.name,))
4913
      self.op.name = name
4914
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4915

    
4916
  def CheckPrereq(self):
4917
    """Check prerequisites.
4918

4919
    """
4920
    if self.op.kind == constants.TAG_CLUSTER:
4921
      self.target = self.cfg.GetClusterInfo()
4922
    elif self.op.kind == constants.TAG_NODE:
4923
      self.target = self.cfg.GetNodeInfo(self.op.name)
4924
    elif self.op.kind == constants.TAG_INSTANCE:
4925
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4926
    else:
4927
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4928
                                 str(self.op.kind))
4929

    
4930

    
4931
class LUGetTags(TagsLU):
4932
  """Returns the tags of a given object.
4933

4934
  """
4935
  _OP_REQP = ["kind", "name"]
4936
  REQ_BGL = False
4937

    
4938
  def Exec(self, feedback_fn):
4939
    """Returns the tag list.
4940

4941
    """
4942
    return list(self.target.GetTags())
4943

    
4944

    
4945
class LUSearchTags(NoHooksLU):
4946
  """Searches the tags for a given pattern.
4947

4948
  """
4949
  _OP_REQP = ["pattern"]
4950
  REQ_BGL = False
4951

    
4952
  def ExpandNames(self):
4953
    self.needed_locks = {}
4954

    
4955
  def CheckPrereq(self):
4956
    """Check prerequisites.
4957

4958
    This checks the pattern passed for validity by compiling it.
4959

4960
    """
4961
    try:
4962
      self.re = re.compile(self.op.pattern)
4963
    except re.error, err:
4964
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4965
                                 (self.op.pattern, err))
4966

    
4967
  def Exec(self, feedback_fn):
4968
    """Returns the tag list.
4969

4970
    """
4971
    cfg = self.cfg
4972
    tgts = [("/cluster", cfg.GetClusterInfo())]
4973
    ilist = cfg.GetAllInstancesInfo().values()
4974
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4975
    nlist = cfg.GetAllNodesInfo().values()
4976
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4977
    results = []
4978
    for path, target in tgts:
4979
      for tag in target.GetTags():
4980
        if self.re.search(tag):
4981
          results.append((path, tag))
4982
    return results
4983

    
4984

    
4985
class LUAddTags(TagsLU):
4986
  """Sets a tag on a given object.
4987

4988
  """
4989
  _OP_REQP = ["kind", "name", "tags"]
4990
  REQ_BGL = False
4991

    
4992
  def CheckPrereq(self):
4993
    """Check prerequisites.
4994

4995
    This checks the type and length of the tag name and value.
4996

4997
    """
4998
    TagsLU.CheckPrereq(self)
4999
    for tag in self.op.tags:
5000
      objects.TaggableObject.ValidateTag(tag)
5001

    
5002
  def Exec(self, feedback_fn):
5003
    """Sets the tag.
5004

5005
    """
5006
    try:
5007
      for tag in self.op.tags:
5008
        self.target.AddTag(tag)
5009
    except errors.TagError, err:
5010
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5011
    try:
5012
      self.cfg.Update(self.target)
5013
    except errors.ConfigurationError:
5014
      raise errors.OpRetryError("There has been a modification to the"
5015
                                " config file and the operation has been"
5016
                                " aborted. Please retry.")
5017

    
5018

    
5019
class LUDelTags(TagsLU):
5020
  """Delete a list of tags from a given object.
5021

5022
  """
5023
  _OP_REQP = ["kind", "name", "tags"]
5024
  REQ_BGL = False
5025

    
5026
  def CheckPrereq(self):
5027
    """Check prerequisites.
5028

5029
    This checks that we have the given tag.
5030

5031
    """
5032
    TagsLU.CheckPrereq(self)
5033
    for tag in self.op.tags:
5034
      objects.TaggableObject.ValidateTag(tag)
5035
    del_tags = frozenset(self.op.tags)
5036
    cur_tags = self.target.GetTags()
5037
    if not del_tags <= cur_tags:
5038
      diff_tags = del_tags - cur_tags
5039
      diff_names = ["'%s'" % tag for tag in diff_tags]
5040
      diff_names.sort()
5041
      raise errors.OpPrereqError("Tag(s) %s not found" %
5042
                                 (",".join(diff_names)))
5043

    
5044
  def Exec(self, feedback_fn):
5045
    """Remove the tag from the object.
5046

5047
    """
5048
    for tag in self.op.tags:
5049
      self.target.RemoveTag(tag)
5050
    try:
5051
      self.cfg.Update(self.target)
5052
    except errors.ConfigurationError:
5053
      raise errors.OpRetryError("There has been a modification to the"
5054
                                " config file and the operation has been"
5055
                                " aborted. Please retry.")
5056

    
5057

    
5058
class LUTestDelay(NoHooksLU):
5059
  """Sleep for a specified amount of time.
5060

5061
  This LU sleeps on the master and/or nodes for a specified amount of
5062
  time.
5063

5064
  """
5065
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5066
  REQ_BGL = False
5067

    
5068
  def ExpandNames(self):
5069
    """Expand names and set required locks.
5070

5071
    This expands the node list, if any.
5072

5073
    """
5074
    self.needed_locks = {}
5075
    if self.op.on_nodes:
5076
      # _GetWantedNodes can be used here, but is not always appropriate to use
5077
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5078
      # more information.
5079
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5080
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5081

    
5082
  def CheckPrereq(self):
5083
    """Check prerequisites.
5084

5085
    """
5086

    
5087
  def Exec(self, feedback_fn):
5088
    """Do the actual sleep.
5089

5090
    """
5091
    if self.op.on_master:
5092
      if not utils.TestDelay(self.op.duration):
5093
        raise errors.OpExecError("Error during master delay test")
5094
    if self.op.on_nodes:
5095
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5096
      if not result:
5097
        raise errors.OpExecError("Complete failure from rpc call")
5098
      for node, node_result in result.items():
5099
        if not node_result:
5100
          raise errors.OpExecError("Failure during rpc call to node %s,"
5101
                                   " result: %s" % (node, node_result))
5102

    
5103

    
5104
class IAllocator(object):
5105
  """IAllocator framework.
5106

5107
  An IAllocator instance has three sets of attributes:
5108
    - cfg/sstore that are needed to query the cluster
5109
    - input data (all members of the _KEYS class attribute are required)
5110
    - four buffer attributes (in|out_data|text), that represent the
5111
      input (to the external script) in text and data structure format,
5112
      and the output from it, again in two formats
5113
    - the result variables from the script (success, info, nodes) for
5114
      easy usage
5115

5116
  """
5117
  _ALLO_KEYS = [
5118
    "mem_size", "disks", "disk_template",
5119
    "os", "tags", "nics", "vcpus",
5120
    ]
5121
  _RELO_KEYS = [
5122
    "relocate_from",
5123
    ]
5124

    
5125
  def __init__(self, cfg, sstore, mode, name, **kwargs):
5126
    self.cfg = cfg
5127
    self.sstore = sstore
5128
    # init buffer variables
5129
    self.in_text = self.out_text = self.in_data = self.out_data = None
5130
    # init all input fields so that pylint is happy
5131
    self.mode = mode
5132
    self.name = name
5133
    self.mem_size = self.disks = self.disk_template = None
5134
    self.os = self.tags = self.nics = self.vcpus = None
5135
    self.relocate_from = None
5136
    # computed fields
5137
    self.required_nodes = None
5138
    # init result fields
5139
    self.success = self.info = self.nodes = None
5140
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5141
      keyset = self._ALLO_KEYS
5142
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5143
      keyset = self._RELO_KEYS
5144
    else:
5145
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5146
                                   " IAllocator" % self.mode)
5147
    for key in kwargs:
5148
      if key not in keyset:
5149
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5150
                                     " IAllocator" % key)
5151
      setattr(self, key, kwargs[key])
5152
    for key in keyset:
5153
      if key not in kwargs:
5154
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5155
                                     " IAllocator" % key)
5156
    self._BuildInputData()
5157

    
5158
  def _ComputeClusterData(self):
5159
    """Compute the generic allocator input data.
5160

5161
    This is the data that is independent of the actual operation.
5162

5163
    """
5164
    cfg = self.cfg
5165
    # cluster data
5166
    data = {
5167
      "version": 1,
5168
      "cluster_name": self.sstore.GetClusterName(),
5169
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5170
      "hypervisor_type": self.sstore.GetHypervisorType(),
5171
      # we don't have job IDs
5172
      }
5173

    
5174
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5175

    
5176
    # node data
5177
    node_results = {}
5178
    node_list = cfg.GetNodeList()
5179
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5180
    for nname in node_list:
5181
      ninfo = cfg.GetNodeInfo(nname)
5182
      if nname not in node_data or not isinstance(node_data[nname], dict):
5183
        raise errors.OpExecError("Can't get data for node %s" % nname)
5184
      remote_info = node_data[nname]
5185
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5186
                   'vg_size', 'vg_free', 'cpu_total']:
5187
        if attr not in remote_info:
5188
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5189
                                   (nname, attr))
5190
        try:
5191
          remote_info[attr] = int(remote_info[attr])
5192
        except ValueError, err:
5193
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5194
                                   " %s" % (nname, attr, str(err)))
5195
      # compute memory used by primary instances
5196
      i_p_mem = i_p_up_mem = 0
5197
      for iinfo in i_list:
5198
        if iinfo.primary_node == nname:
5199
          i_p_mem += iinfo.memory
5200
          if iinfo.status == "up":
5201
            i_p_up_mem += iinfo.memory
5202

    
5203
      # compute memory used by instances
5204
      pnr = {
5205
        "tags": list(ninfo.GetTags()),
5206
        "total_memory": remote_info['memory_total'],
5207
        "reserved_memory": remote_info['memory_dom0'],
5208
        "free_memory": remote_info['memory_free'],
5209
        "i_pri_memory": i_p_mem,
5210
        "i_pri_up_memory": i_p_up_mem,
5211
        "total_disk": remote_info['vg_size'],
5212
        "free_disk": remote_info['vg_free'],
5213
        "primary_ip": ninfo.primary_ip,
5214
        "secondary_ip": ninfo.secondary_ip,
5215
        "total_cpus": remote_info['cpu_total'],
5216
        }
5217
      node_results[nname] = pnr
5218
    data["nodes"] = node_results
5219

    
5220
    # instance data
5221
    instance_data = {}
5222
    for iinfo in i_list:
5223
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5224
                  for n in iinfo.nics]
5225
      pir = {
5226
        "tags": list(iinfo.GetTags()),
5227
        "should_run": iinfo.status == "up",
5228
        "vcpus": iinfo.vcpus,
5229
        "memory": iinfo.memory,
5230
        "os": iinfo.os,
5231
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5232
        "nics": nic_data,
5233
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5234
        "disk_template": iinfo.disk_template,
5235
        }
5236
      instance_data[iinfo.name] = pir
5237

    
5238
    data["instances"] = instance_data
5239

    
5240
    self.in_data = data
5241

    
5242
  def _AddNewInstance(self):
5243
    """Add new instance data to allocator structure.
5244

5245
    This in combination with _AllocatorGetClusterData will create the
5246
    correct structure needed as input for the allocator.
5247

5248
    The checks for the completeness of the opcode must have already been
5249
    done.
5250

5251
    """
5252
    data = self.in_data
5253
    if len(self.disks) != 2:
5254
      raise errors.OpExecError("Only two-disk configurations supported")
5255

    
5256
    disk_space = _ComputeDiskSize(self.disk_template,
5257
                                  self.disks[0]["size"], self.disks[1]["size"])
5258

    
5259
    if self.disk_template in constants.DTS_NET_MIRROR:
5260
      self.required_nodes = 2
5261
    else:
5262
      self.required_nodes = 1
5263
    request = {
5264
      "type": "allocate",
5265
      "name": self.name,
5266
      "disk_template": self.disk_template,
5267
      "tags": self.tags,
5268
      "os": self.os,
5269
      "vcpus": self.vcpus,
5270
      "memory": self.mem_size,
5271
      "disks": self.disks,
5272
      "disk_space_total": disk_space,
5273
      "nics": self.nics,
5274
      "required_nodes": self.required_nodes,
5275
      }
5276
    data["request"] = request
5277

    
5278
  def _AddRelocateInstance(self):
5279
    """Add relocate instance data to allocator structure.
5280

5281
    This in combination with _IAllocatorGetClusterData will create the
5282
    correct structure needed as input for the allocator.
5283

5284
    The checks for the completeness of the opcode must have already been
5285
    done.
5286

5287
    """
5288
    instance = self.cfg.GetInstanceInfo(self.name)
5289
    if instance is None:
5290
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5291
                                   " IAllocator" % self.name)
5292

    
5293
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5294
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5295

    
5296
    if len(instance.secondary_nodes) != 1:
5297
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5298

    
5299
    self.required_nodes = 1
5300

    
5301
    disk_space = _ComputeDiskSize(instance.disk_template,
5302
                                  instance.disks[0].size,
5303
                                  instance.disks[1].size)
5304

    
5305
    request = {
5306
      "type": "relocate",
5307
      "name": self.name,
5308
      "disk_space_total": disk_space,
5309
      "required_nodes": self.required_nodes,
5310
      "relocate_from": self.relocate_from,
5311
      }
5312
    self.in_data["request"] = request
5313

    
5314
  def _BuildInputData(self):
5315
    """Build input data structures.
5316

5317
    """
5318
    self._ComputeClusterData()
5319

    
5320
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5321
      self._AddNewInstance()
5322
    else:
5323
      self._AddRelocateInstance()
5324

    
5325
    self.in_text = serializer.Dump(self.in_data)
5326

    
5327
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5328
    """Run an instance allocator and return the results.
5329

5330
    """
5331
    data = self.in_text
5332

    
5333
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5334

    
5335
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5336
      raise errors.OpExecError("Invalid result from master iallocator runner")
5337

    
5338
    rcode, stdout, stderr, fail = result
5339

    
5340
    if rcode == constants.IARUN_NOTFOUND:
5341
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5342
    elif rcode == constants.IARUN_FAILURE:
5343
      raise errors.OpExecError("Instance allocator call failed: %s,"
5344
                               " output: %s" % (fail, stdout+stderr))
5345
    self.out_text = stdout
5346
    if validate:
5347
      self._ValidateResult()
5348

    
5349
  def _ValidateResult(self):
5350
    """Process the allocator results.
5351

5352
    This will process and if successful save the result in
5353
    self.out_data and the other parameters.
5354

5355
    """
5356
    try:
5357
      rdict = serializer.Load(self.out_text)
5358
    except Exception, err:
5359
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5360

    
5361
    if not isinstance(rdict, dict):
5362
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5363

    
5364
    for key in "success", "info", "nodes":
5365
      if key not in rdict:
5366
        raise errors.OpExecError("Can't parse iallocator results:"
5367
                                 " missing key '%s'" % key)
5368
      setattr(self, key, rdict[key])
5369

    
5370
    if not isinstance(rdict["nodes"], list):
5371
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5372
                               " is not a list")
5373
    self.out_data = rdict
5374

    
5375

    
5376
class LUTestAllocator(NoHooksLU):
5377
  """Run allocator tests.
5378

5379
  This LU runs the allocator tests
5380

5381
  """
5382
  _OP_REQP = ["direction", "mode", "name"]
5383

    
5384
  def CheckPrereq(self):
5385
    """Check prerequisites.
5386

5387
    This checks the opcode parameters depending on the director and mode test.
5388

5389
    """
5390
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5391
      for attr in ["name", "mem_size", "disks", "disk_template",
5392
                   "os", "tags", "nics", "vcpus"]:
5393
        if not hasattr(self.op, attr):
5394
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5395
                                     attr)
5396
      iname = self.cfg.ExpandInstanceName(self.op.name)
5397
      if iname is not None:
5398
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5399
                                   iname)
5400
      if not isinstance(self.op.nics, list):
5401
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5402
      for row in self.op.nics:
5403
        if (not isinstance(row, dict) or
5404
            "mac" not in row or
5405
            "ip" not in row or
5406
            "bridge" not in row):
5407
          raise errors.OpPrereqError("Invalid contents of the"
5408
                                     " 'nics' parameter")
5409
      if not isinstance(self.op.disks, list):
5410
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5411
      if len(self.op.disks) != 2:
5412
        raise errors.OpPrereqError("Only two-disk configurations supported")
5413
      for row in self.op.disks:
5414
        if (not isinstance(row, dict) or
5415
            "size" not in row or
5416
            not isinstance(row["size"], int) or
5417
            "mode" not in row or
5418
            row["mode"] not in ['r', 'w']):
5419
          raise errors.OpPrereqError("Invalid contents of the"
5420
                                     " 'disks' parameter")
5421
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5422
      if not hasattr(self.op, "name"):
5423
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5424
      fname = self.cfg.ExpandInstanceName(self.op.name)
5425
      if fname is None:
5426
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5427
                                   self.op.name)
5428
      self.op.name = fname
5429
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5430
    else:
5431
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5432
                                 self.op.mode)
5433

    
5434
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5435
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5436
        raise errors.OpPrereqError("Missing allocator name")
5437
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5438
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5439
                                 self.op.direction)
5440

    
5441
  def Exec(self, feedback_fn):
5442
    """Run the allocator test.
5443

5444
    """
5445
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5446
      ial = IAllocator(self.cfg, self.sstore,
5447
                       mode=self.op.mode,
5448
                       name=self.op.name,
5449
                       mem_size=self.op.mem_size,
5450
                       disks=self.op.disks,
5451
                       disk_template=self.op.disk_template,
5452
                       os=self.op.os,
5453
                       tags=self.op.tags,
5454
                       nics=self.op.nics,
5455
                       vcpus=self.op.vcpus,
5456
                       )
5457
    else:
5458
      ial = IAllocator(self.cfg, self.sstore,
5459
                       mode=self.op.mode,
5460
                       name=self.op.name,
5461
                       relocate_from=list(self.relocate_from),
5462
                       )
5463

    
5464
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5465
      result = ial.in_text
5466
    else:
5467
      ial.Run(self.op.allocator, validate=False)
5468
      result = ial.out_text
5469
    return result