Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 1ce4bbe3

History | View | Annotate | Download (187.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.sstore = sstore
83
    self.context = context
84
    # Dicts used to declare locking needs to mcpu
85
    self.needed_locks = None
86
    self.acquired_locks = {}
87
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88
    self.add_locks = {}
89
    self.remove_locks = {}
90
    # Used to force good behavior when calling helper functions
91
    self.recalculate_locks = {}
92
    self.__ssh = None
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = sstore.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.sstore)
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  Args:
377
    static: Static fields
378
    dynamic: Dynamic fields
379

380
  """
381
  static_fields = frozenset(static)
382
  dynamic_fields = frozenset(dynamic)
383

    
384
  all_fields = static_fields | dynamic_fields
385

    
386
  if not all_fields.issuperset(selected):
387
    raise errors.OpPrereqError("Unknown output fields selected: %s"
388
                               % ",".join(frozenset(selected).
389
                                          difference(all_fields)))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  args = {
434
    'name': instance.name,
435
    'primary_node': instance.primary_node,
436
    'secondary_nodes': instance.secondary_nodes,
437
    'os_type': instance.os,
438
    'status': instance.os,
439
    'memory': instance.memory,
440
    'vcpus': instance.vcpus,
441
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442
  }
443
  if override:
444
    args.update(override)
445
  return _BuildInstanceHookEnv(**args)
446

    
447

    
448
def _CheckInstanceBridgesExist(instance):
449
  """Check that the brigdes needed by an instance exist.
450

451
  """
452
  # check bridges existance
453
  brlist = [nic.bridge for nic in instance.nics]
454
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
455
    raise errors.OpPrereqError("one or more target bridges %s does not"
456
                               " exist on destination node '%s'" %
457
                               (brlist, instance.primary_node))
458

    
459

    
460
class LUDestroyCluster(NoHooksLU):
461
  """Logical unit for destroying the cluster.
462

463
  """
464
  _OP_REQP = []
465

    
466
  def CheckPrereq(self):
467
    """Check prerequisites.
468

469
    This checks whether the cluster is empty.
470

471
    Any errors are signalled by raising errors.OpPrereqError.
472

473
    """
474
    master = self.sstore.GetMasterNode()
475

    
476
    nodelist = self.cfg.GetNodeList()
477
    if len(nodelist) != 1 or nodelist[0] != master:
478
      raise errors.OpPrereqError("There are still %d node(s) in"
479
                                 " this cluster." % (len(nodelist) - 1))
480
    instancelist = self.cfg.GetInstanceList()
481
    if instancelist:
482
      raise errors.OpPrereqError("There are still %d instance(s) in"
483
                                 " this cluster." % len(instancelist))
484

    
485
  def Exec(self, feedback_fn):
486
    """Destroys the cluster.
487

488
    """
489
    master = self.sstore.GetMasterNode()
490
    if not rpc.call_node_stop_master(master, False):
491
      raise errors.OpExecError("Could not disable the master role")
492
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493
    utils.CreateBackup(priv_key)
494
    utils.CreateBackup(pub_key)
495
    return master
496

    
497

    
498
class LUVerifyCluster(LogicalUnit):
499
  """Verifies the cluster status.
500

501
  """
502
  HPATH = "cluster-verify"
503
  HTYPE = constants.HTYPE_CLUSTER
504
  _OP_REQP = ["skip_checks"]
505
  REQ_BGL = False
506

    
507
  def ExpandNames(self):
508
    self.needed_locks = {
509
      locking.LEVEL_NODE: locking.ALL_SET,
510
      locking.LEVEL_INSTANCE: locking.ALL_SET,
511
    }
512
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513

    
514
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515
                  remote_version, feedback_fn):
516
    """Run multiple tests against a node.
517

518
    Test list:
519
      - compares ganeti version
520
      - checks vg existance and size > 20G
521
      - checks config file checksum
522
      - checks ssh to other nodes
523

524
    Args:
525
      node: name of the node to check
526
      file_list: required list of files
527
      local_cksum: dictionary of local files and their checksums
528

529
    """
530
    # compares ganeti version
531
    local_version = constants.PROTOCOL_VERSION
532
    if not remote_version:
533
      feedback_fn("  - ERROR: connection to %s failed" % (node))
534
      return True
535

    
536
    if local_version != remote_version:
537
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538
                      (local_version, node, remote_version))
539
      return True
540

    
541
    # checks vg existance and size > 20G
542

    
543
    bad = False
544
    if not vglist:
545
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546
                      (node,))
547
      bad = True
548
    else:
549
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550
                                            constants.MIN_VG_SIZE)
551
      if vgstatus:
552
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553
        bad = True
554

    
555
    # checks config file checksum
556
    # checks ssh to any
557

    
558
    if 'filelist' not in node_result:
559
      bad = True
560
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
561
    else:
562
      remote_cksum = node_result['filelist']
563
      for file_name in file_list:
564
        if file_name not in remote_cksum:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
567
        elif remote_cksum[file_name] != local_cksum[file_name]:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570

    
571
    if 'nodelist' not in node_result:
572
      bad = True
573
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574
    else:
575
      if node_result['nodelist']:
576
        bad = True
577
        for node in node_result['nodelist']:
578
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579
                          (node, node_result['nodelist'][node]))
580
    if 'node-net-test' not in node_result:
581
      bad = True
582
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583
    else:
584
      if node_result['node-net-test']:
585
        bad = True
586
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
587
        for node in nlist:
588
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589
                          (node, node_result['node-net-test'][node]))
590

    
591
    hyp_result = node_result.get('hypervisor', None)
592
    if hyp_result is not None:
593
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594
    return bad
595

    
596
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597
                      node_instance, feedback_fn):
598
    """Verify an instance.
599

600
    This function checks to see if the required block devices are
601
    available on the instance's node.
602

603
    """
604
    bad = False
605

    
606
    node_current = instanceconfig.primary_node
607

    
608
    node_vol_should = {}
609
    instanceconfig.MapLVsByNode(node_vol_should)
610

    
611
    for node in node_vol_should:
612
      for volume in node_vol_should[node]:
613
        if node not in node_vol_is or volume not in node_vol_is[node]:
614
          feedback_fn("  - ERROR: volume %s missing on node %s" %
615
                          (volume, node))
616
          bad = True
617

    
618
    if not instanceconfig.status == 'down':
619
      if (node_current not in node_instance or
620
          not instance in node_instance[node_current]):
621
        feedback_fn("  - ERROR: instance %s not running on node %s" %
622
                        (instance, node_current))
623
        bad = True
624

    
625
    for node in node_instance:
626
      if (not node == node_current):
627
        if instance in node_instance[node]:
628
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
629
                          (instance, node))
630
          bad = True
631

    
632
    return bad
633

    
634
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635
    """Verify if there are any unknown volumes in the cluster.
636

637
    The .os, .swap and backup volumes are ignored. All other volumes are
638
    reported as unknown.
639

640
    """
641
    bad = False
642

    
643
    for node in node_vol_is:
644
      for volume in node_vol_is[node]:
645
        if node not in node_vol_should or volume not in node_vol_should[node]:
646
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647
                      (volume, node))
648
          bad = True
649
    return bad
650

    
651
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652
    """Verify the list of running instances.
653

654
    This checks what instances are running but unknown to the cluster.
655

656
    """
657
    bad = False
658
    for node in node_instance:
659
      for runninginstance in node_instance[node]:
660
        if runninginstance not in instancelist:
661
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662
                          (runninginstance, node))
663
          bad = True
664
    return bad
665

    
666
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667
    """Verify N+1 Memory Resilience.
668

669
    Check that if one single node dies we can still start all the instances it
670
    was primary for.
671

672
    """
673
    bad = False
674

    
675
    for node, nodeinfo in node_info.iteritems():
676
      # This code checks that every node which is now listed as secondary has
677
      # enough memory to host all instances it is supposed to should a single
678
      # other node in the cluster fail.
679
      # FIXME: not ready for failover to an arbitrary node
680
      # FIXME: does not support file-backed instances
681
      # WARNING: we currently take into account down instances as well as up
682
      # ones, considering that even if they're down someone might want to start
683
      # them even in the event of a node failure.
684
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685
        needed_mem = 0
686
        for instance in instances:
687
          needed_mem += instance_cfg[instance].memory
688
        if nodeinfo['mfree'] < needed_mem:
689
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690
                      " failovers should node %s fail" % (node, prinode))
691
          bad = True
692
    return bad
693

    
694
  def CheckPrereq(self):
695
    """Check prerequisites.
696

697
    Transform the list of checks we're going to skip into a set and check that
698
    all its members are valid.
699

700
    """
701
    self.skip_set = frozenset(self.op.skip_checks)
702
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    Cluster-Verify hooks just rone in the post phase and their failure makes
709
    the output be logged in the verify output and the verification to fail.
710

711
    """
712
    all_nodes = self.cfg.GetNodeList()
713
    # TODO: populate the environment with useful information for verify hooks
714
    env = {}
715
    return env, [], all_nodes
716

    
717
  def Exec(self, feedback_fn):
718
    """Verify integrity of cluster, performing various test on nodes.
719

720
    """
721
    bad = False
722
    feedback_fn("* Verifying global settings")
723
    for msg in self.cfg.VerifyConfig():
724
      feedback_fn("  - ERROR: %s" % msg)
725

    
726
    vg_name = self.cfg.GetVGName()
727
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
728
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730
    i_non_redundant = [] # Non redundant instances
731
    node_volume = {}
732
    node_instance = {}
733
    node_info = {}
734
    instance_cfg = {}
735

    
736
    # FIXME: verify OS list
737
    # do local checksums
738
    file_names = list(self.sstore.GetFileList())
739
    file_names.append(constants.SSL_CERT_FILE)
740
    file_names.append(constants.CLUSTER_CONF_FILE)
741
    local_checksums = utils.FingerprintFiles(file_names)
742

    
743
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745
    all_instanceinfo = rpc.call_instance_list(nodelist)
746
    all_vglist = rpc.call_vg_list(nodelist)
747
    node_verify_param = {
748
      'filelist': file_names,
749
      'nodelist': nodelist,
750
      'hypervisor': None,
751
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752
                        for node in nodeinfo]
753
      }
754
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755
    all_rversion = rpc.call_version(nodelist)
756
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757

    
758
    for node in nodelist:
759
      feedback_fn("* Verifying node %s" % node)
760
      result = self._VerifyNode(node, file_names, local_checksums,
761
                                all_vglist[node], all_nvinfo[node],
762
                                all_rversion[node], feedback_fn)
763
      bad = bad or result
764

    
765
      # node_volume
766
      volumeinfo = all_volumeinfo[node]
767

    
768
      if isinstance(volumeinfo, basestring):
769
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770
                    (node, volumeinfo[-400:].encode('string_escape')))
771
        bad = True
772
        node_volume[node] = {}
773
      elif not isinstance(volumeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777
      else:
778
        node_volume[node] = volumeinfo
779

    
780
      # node_instance
781
      nodeinstance = all_instanceinfo[node]
782
      if type(nodeinstance) != list:
783
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
784
        bad = True
785
        continue
786

    
787
      node_instance[node] = nodeinstance
788

    
789
      # node_info
790
      nodeinfo = all_ninfo[node]
791
      if not isinstance(nodeinfo, dict):
792
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
793
        bad = True
794
        continue
795

    
796
      try:
797
        node_info[node] = {
798
          "mfree": int(nodeinfo['memory_free']),
799
          "dfree": int(nodeinfo['vg_free']),
800
          "pinst": [],
801
          "sinst": [],
802
          # dictionary holding all instances this node is secondary for,
803
          # grouped by their primary node. Each key is a cluster node, and each
804
          # value is a list of instances which have the key as primary and the
805
          # current node as secondary.  this is handy to calculate N+1 memory
806
          # availability if you can only failover from a primary to its
807
          # secondary.
808
          "sinst-by-pnode": {},
809
        }
810
      except ValueError:
811
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812
        bad = True
813
        continue
814

    
815
    node_vol_should = {}
816

    
817
    for instance in instancelist:
818
      feedback_fn("* Verifying instance %s" % instance)
819
      inst_config = self.cfg.GetInstanceInfo(instance)
820
      result =  self._VerifyInstance(instance, inst_config, node_volume,
821
                                     node_instance, feedback_fn)
822
      bad = bad or result
823

    
824
      inst_config.MapLVsByNode(node_vol_should)
825

    
826
      instance_cfg[instance] = inst_config
827

    
828
      pnode = inst_config.primary_node
829
      if pnode in node_info:
830
        node_info[pnode]['pinst'].append(instance)
831
      else:
832
        feedback_fn("  - ERROR: instance %s, connection to primary node"
833
                    " %s failed" % (instance, pnode))
834
        bad = True
835

    
836
      # If the instance is non-redundant we cannot survive losing its primary
837
      # node, so we are not N+1 compliant. On the other hand we have no disk
838
      # templates with more than one secondary so that situation is not well
839
      # supported either.
840
      # FIXME: does not support file-backed instances
841
      if len(inst_config.secondary_nodes) == 0:
842
        i_non_redundant.append(instance)
843
      elif len(inst_config.secondary_nodes) > 1:
844
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
845
                    % instance)
846

    
847
      for snode in inst_config.secondary_nodes:
848
        if snode in node_info:
849
          node_info[snode]['sinst'].append(instance)
850
          if pnode not in node_info[snode]['sinst-by-pnode']:
851
            node_info[snode]['sinst-by-pnode'][pnode] = []
852
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853
        else:
854
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
855
                      " %s failed" % (instance, snode))
856

    
857
    feedback_fn("* Verifying orphan volumes")
858
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859
                                       feedback_fn)
860
    bad = bad or result
861

    
862
    feedback_fn("* Verifying remaining instances")
863
    result = self._VerifyOrphanInstances(instancelist, node_instance,
864
                                         feedback_fn)
865
    bad = bad or result
866

    
867
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868
      feedback_fn("* Verifying N+1 Memory redundancy")
869
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870
      bad = bad or result
871

    
872
    feedback_fn("* Other Notes")
873
    if i_non_redundant:
874
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875
                  % len(i_non_redundant))
876

    
877
    return not bad
878

    
879
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880
    """Analize the post-hooks' result, handle it, and send some
881
    nicely-formatted feedback back to the user.
882

883
    Args:
884
      phase: the hooks phase that has just been run
885
      hooks_results: the results of the multi-node hooks rpc call
886
      feedback_fn: function to send feedback back to the caller
887
      lu_result: previous Exec result
888

889
    """
890
    # We only really run POST phase hooks, and are only interested in
891
    # their results
892
    if phase == constants.HOOKS_PHASE_POST:
893
      # Used to change hooks' output to proper indentation
894
      indent_re = re.compile('^', re.M)
895
      feedback_fn("* Hooks Results")
896
      if not hooks_results:
897
        feedback_fn("  - ERROR: general communication failure")
898
        lu_result = 1
899
      else:
900
        for node_name in hooks_results:
901
          show_node_header = True
902
          res = hooks_results[node_name]
903
          if res is False or not isinstance(res, list):
904
            feedback_fn("    Communication failure")
905
            lu_result = 1
906
            continue
907
          for script, hkr, output in res:
908
            if hkr == constants.HKR_FAIL:
909
              # The node header is only shown once, if there are
910
              # failing hooks on that node
911
              if show_node_header:
912
                feedback_fn("  Node %s:" % node_name)
913
                show_node_header = False
914
              feedback_fn("    ERROR: Script %s failed, output:" % script)
915
              output = indent_re.sub('      ', output)
916
              feedback_fn("%s" % output)
917
              lu_result = 1
918

    
919
      return lu_result
920

    
921

    
922
class LUVerifyDisks(NoHooksLU):
923
  """Verifies the cluster disks status.
924

925
  """
926
  _OP_REQP = []
927
  REQ_BGL = False
928

    
929
  def ExpandNames(self):
930
    self.needed_locks = {
931
      locking.LEVEL_NODE: locking.ALL_SET,
932
      locking.LEVEL_INSTANCE: locking.ALL_SET,
933
    }
934
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935

    
936
  def CheckPrereq(self):
937
    """Check prerequisites.
938

939
    This has no prerequisites.
940

941
    """
942
    pass
943

    
944
  def Exec(self, feedback_fn):
945
    """Verify integrity of cluster disks.
946

947
    """
948
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949

    
950
    vg_name = self.cfg.GetVGName()
951
    nodes = utils.NiceSort(self.cfg.GetNodeList())
952
    instances = [self.cfg.GetInstanceInfo(name)
953
                 for name in self.cfg.GetInstanceList()]
954

    
955
    nv_dict = {}
956
    for inst in instances:
957
      inst_lvs = {}
958
      if (inst.status != "up" or
959
          inst.disk_template not in constants.DTS_NET_MIRROR):
960
        continue
961
      inst.MapLVsByNode(inst_lvs)
962
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963
      for node, vol_list in inst_lvs.iteritems():
964
        for vol in vol_list:
965
          nv_dict[(node, vol)] = inst
966

    
967
    if not nv_dict:
968
      return result
969

    
970
    node_lvs = rpc.call_volume_list(nodes, vg_name)
971

    
972
    to_act = set()
973
    for node in nodes:
974
      # node_volume
975
      lvs = node_lvs[node]
976

    
977
      if isinstance(lvs, basestring):
978
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979
        res_nlvm[node] = lvs
980
      elif not isinstance(lvs, dict):
981
        logger.Info("connection to node %s failed or invalid data returned" %
982
                    (node,))
983
        res_nodes.append(node)
984
        continue
985

    
986
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987
        inst = nv_dict.pop((node, lv_name), None)
988
        if (not lv_online and inst is not None
989
            and inst.name not in res_instances):
990
          res_instances.append(inst.name)
991

    
992
    # any leftover items in nv_dict are missing LVs, let's arrange the
993
    # data better
994
    for key, inst in nv_dict.iteritems():
995
      if inst.name not in res_missing:
996
        res_missing[inst.name] = []
997
      res_missing[inst.name].append(key)
998

    
999
    return result
1000

    
1001

    
1002
class LURenameCluster(LogicalUnit):
1003
  """Rename the cluster.
1004

1005
  """
1006
  HPATH = "cluster-rename"
1007
  HTYPE = constants.HTYPE_CLUSTER
1008
  _OP_REQP = ["name"]
1009
  REQ_WSSTORE = True
1010

    
1011
  def BuildHooksEnv(self):
1012
    """Build hooks env.
1013

1014
    """
1015
    env = {
1016
      "OP_TARGET": self.sstore.GetClusterName(),
1017
      "NEW_NAME": self.op.name,
1018
      }
1019
    mn = self.sstore.GetMasterNode()
1020
    return env, [mn], [mn]
1021

    
1022
  def CheckPrereq(self):
1023
    """Verify that the passed name is a valid one.
1024

1025
    """
1026
    hostname = utils.HostInfo(self.op.name)
1027

    
1028
    new_name = hostname.name
1029
    self.ip = new_ip = hostname.ip
1030
    old_name = self.sstore.GetClusterName()
1031
    old_ip = self.sstore.GetMasterIP()
1032
    if new_name == old_name and new_ip == old_ip:
1033
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1034
                                 " cluster has changed")
1035
    if new_ip != old_ip:
1036
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1037
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1038
                                   " reachable on the network. Aborting." %
1039
                                   new_ip)
1040

    
1041
    self.op.name = new_name
1042

    
1043
  def Exec(self, feedback_fn):
1044
    """Rename the cluster.
1045

1046
    """
1047
    clustername = self.op.name
1048
    ip = self.ip
1049
    ss = self.sstore
1050

    
1051
    # shutdown the master IP
1052
    master = ss.GetMasterNode()
1053
    if not rpc.call_node_stop_master(master, False):
1054
      raise errors.OpExecError("Could not disable the master role")
1055

    
1056
    try:
1057
      # modify the sstore
1058
      ss.SetKey(ss.SS_MASTER_IP, ip)
1059
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1060

    
1061
      # Distribute updated ss config to all nodes
1062
      myself = self.cfg.GetNodeInfo(master)
1063
      dist_nodes = self.cfg.GetNodeList()
1064
      if myself.name in dist_nodes:
1065
        dist_nodes.remove(myself.name)
1066

    
1067
      logger.Debug("Copying updated ssconf data to all nodes")
1068
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1069
        fname = ss.KeyToFilename(keyname)
1070
        result = rpc.call_upload_file(dist_nodes, fname)
1071
        for to_node in dist_nodes:
1072
          if not result[to_node]:
1073
            logger.Error("copy of file %s to node %s failed" %
1074
                         (fname, to_node))
1075
    finally:
1076
      if not rpc.call_node_start_master(master, False):
1077
        logger.Error("Could not re-enable the master role on the master,"
1078
                     " please restart manually.")
1079

    
1080

    
1081
def _RecursiveCheckIfLVMBased(disk):
1082
  """Check if the given disk or its children are lvm-based.
1083

1084
  Args:
1085
    disk: ganeti.objects.Disk object
1086

1087
  Returns:
1088
    boolean indicating whether a LD_LV dev_type was found or not
1089

1090
  """
1091
  if disk.children:
1092
    for chdisk in disk.children:
1093
      if _RecursiveCheckIfLVMBased(chdisk):
1094
        return True
1095
  return disk.dev_type == constants.LD_LV
1096

    
1097

    
1098
class LUSetClusterParams(LogicalUnit):
1099
  """Change the parameters of the cluster.
1100

1101
  """
1102
  HPATH = "cluster-modify"
1103
  HTYPE = constants.HTYPE_CLUSTER
1104
  _OP_REQP = []
1105
  REQ_BGL = False
1106

    
1107
  def ExpandNames(self):
1108
    # FIXME: in the future maybe other cluster params won't require checking on
1109
    # all nodes to be modified.
1110
    self.needed_locks = {
1111
      locking.LEVEL_NODE: locking.ALL_SET,
1112
    }
1113
    self.share_locks[locking.LEVEL_NODE] = 1
1114

    
1115
  def BuildHooksEnv(self):
1116
    """Build hooks env.
1117

1118
    """
1119
    env = {
1120
      "OP_TARGET": self.sstore.GetClusterName(),
1121
      "NEW_VG_NAME": self.op.vg_name,
1122
      }
1123
    mn = self.sstore.GetMasterNode()
1124
    return env, [mn], [mn]
1125

    
1126
  def CheckPrereq(self):
1127
    """Check prerequisites.
1128

1129
    This checks whether the given params don't conflict and
1130
    if the given volume group is valid.
1131

1132
    """
1133
    # FIXME: This only works because there is only one parameter that can be
1134
    # changed or removed.
1135
    if not self.op.vg_name:
1136
      instances = self.cfg.GetAllInstancesInfo().values()
1137
      for inst in instances:
1138
        for disk in inst.disks:
1139
          if _RecursiveCheckIfLVMBased(disk):
1140
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1141
                                       " lvm-based instances exist")
1142

    
1143
    # if vg_name not None, checks given volume group on all nodes
1144
    if self.op.vg_name:
1145
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1146
      vglist = rpc.call_vg_list(node_list)
1147
      for node in node_list:
1148
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1149
                                              constants.MIN_VG_SIZE)
1150
        if vgstatus:
1151
          raise errors.OpPrereqError("Error on node '%s': %s" %
1152
                                     (node, vgstatus))
1153

    
1154
  def Exec(self, feedback_fn):
1155
    """Change the parameters of the cluster.
1156

1157
    """
1158
    if self.op.vg_name != self.cfg.GetVGName():
1159
      self.cfg.SetVGName(self.op.vg_name)
1160
    else:
1161
      feedback_fn("Cluster LVM configuration already in desired"
1162
                  " state, not changing")
1163

    
1164

    
1165
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1166
  """Sleep and poll for an instance's disk to sync.
1167

1168
  """
1169
  if not instance.disks:
1170
    return True
1171

    
1172
  if not oneshot:
1173
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1174

    
1175
  node = instance.primary_node
1176

    
1177
  for dev in instance.disks:
1178
    cfgw.SetDiskID(dev, node)
1179

    
1180
  retries = 0
1181
  while True:
1182
    max_time = 0
1183
    done = True
1184
    cumul_degraded = False
1185
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1186
    if not rstats:
1187
      proc.LogWarning("Can't get any data from node %s" % node)
1188
      retries += 1
1189
      if retries >= 10:
1190
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1191
                                 " aborting." % node)
1192
      time.sleep(6)
1193
      continue
1194
    retries = 0
1195
    for i in range(len(rstats)):
1196
      mstat = rstats[i]
1197
      if mstat is None:
1198
        proc.LogWarning("Can't compute data for node %s/%s" %
1199
                        (node, instance.disks[i].iv_name))
1200
        continue
1201
      # we ignore the ldisk parameter
1202
      perc_done, est_time, is_degraded, _ = mstat
1203
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1204
      if perc_done is not None:
1205
        done = False
1206
        if est_time is not None:
1207
          rem_time = "%d estimated seconds remaining" % est_time
1208
          max_time = est_time
1209
        else:
1210
          rem_time = "no time estimate"
1211
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1212
                     (instance.disks[i].iv_name, perc_done, rem_time))
1213
    if done or oneshot:
1214
      break
1215

    
1216
    time.sleep(min(60, max_time))
1217

    
1218
  if done:
1219
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1220
  return not cumul_degraded
1221

    
1222

    
1223
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1224
  """Check that mirrors are not degraded.
1225

1226
  The ldisk parameter, if True, will change the test from the
1227
  is_degraded attribute (which represents overall non-ok status for
1228
  the device(s)) to the ldisk (representing the local storage status).
1229

1230
  """
1231
  cfgw.SetDiskID(dev, node)
1232
  if ldisk:
1233
    idx = 6
1234
  else:
1235
    idx = 5
1236

    
1237
  result = True
1238
  if on_primary or dev.AssembleOnSecondary():
1239
    rstats = rpc.call_blockdev_find(node, dev)
1240
    if not rstats:
1241
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1242
      result = False
1243
    else:
1244
      result = result and (not rstats[idx])
1245
  if dev.children:
1246
    for child in dev.children:
1247
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1248

    
1249
  return result
1250

    
1251

    
1252
class LUDiagnoseOS(NoHooksLU):
1253
  """Logical unit for OS diagnose/query.
1254

1255
  """
1256
  _OP_REQP = ["output_fields", "names"]
1257
  REQ_BGL = False
1258

    
1259
  def ExpandNames(self):
1260
    if self.op.names:
1261
      raise errors.OpPrereqError("Selective OS query not supported")
1262

    
1263
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1264
    _CheckOutputFields(static=[],
1265
                       dynamic=self.dynamic_fields,
1266
                       selected=self.op.output_fields)
1267

    
1268
    # Lock all nodes, in shared mode
1269
    self.needed_locks = {}
1270
    self.share_locks[locking.LEVEL_NODE] = 1
1271
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1272

    
1273
  def CheckPrereq(self):
1274
    """Check prerequisites.
1275

1276
    """
1277

    
1278
  @staticmethod
1279
  def _DiagnoseByOS(node_list, rlist):
1280
    """Remaps a per-node return list into an a per-os per-node dictionary
1281

1282
      Args:
1283
        node_list: a list with the names of all nodes
1284
        rlist: a map with node names as keys and OS objects as values
1285

1286
      Returns:
1287
        map: a map with osnames as keys and as value another map, with
1288
             nodes as
1289
             keys and list of OS objects as values
1290
             e.g. {"debian-etch": {"node1": [<object>,...],
1291
                                   "node2": [<object>,]}
1292
                  }
1293

1294
    """
1295
    all_os = {}
1296
    for node_name, nr in rlist.iteritems():
1297
      if not nr:
1298
        continue
1299
      for os_obj in nr:
1300
        if os_obj.name not in all_os:
1301
          # build a list of nodes for this os containing empty lists
1302
          # for each node in node_list
1303
          all_os[os_obj.name] = {}
1304
          for nname in node_list:
1305
            all_os[os_obj.name][nname] = []
1306
        all_os[os_obj.name][node_name].append(os_obj)
1307
    return all_os
1308

    
1309
  def Exec(self, feedback_fn):
1310
    """Compute the list of OSes.
1311

1312
    """
1313
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1314
    node_data = rpc.call_os_diagnose(node_list)
1315
    if node_data == False:
1316
      raise errors.OpExecError("Can't gather the list of OSes")
1317
    pol = self._DiagnoseByOS(node_list, node_data)
1318
    output = []
1319
    for os_name, os_data in pol.iteritems():
1320
      row = []
1321
      for field in self.op.output_fields:
1322
        if field == "name":
1323
          val = os_name
1324
        elif field == "valid":
1325
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1326
        elif field == "node_status":
1327
          val = {}
1328
          for node_name, nos_list in os_data.iteritems():
1329
            val[node_name] = [(v.status, v.path) for v in nos_list]
1330
        else:
1331
          raise errors.ParameterError(field)
1332
        row.append(val)
1333
      output.append(row)
1334

    
1335
    return output
1336

    
1337

    
1338
class LURemoveNode(LogicalUnit):
1339
  """Logical unit for removing a node.
1340

1341
  """
1342
  HPATH = "node-remove"
1343
  HTYPE = constants.HTYPE_NODE
1344
  _OP_REQP = ["node_name"]
1345

    
1346
  def BuildHooksEnv(self):
1347
    """Build hooks env.
1348

1349
    This doesn't run on the target node in the pre phase as a failed
1350
    node would then be impossible to remove.
1351

1352
    """
1353
    env = {
1354
      "OP_TARGET": self.op.node_name,
1355
      "NODE_NAME": self.op.node_name,
1356
      }
1357
    all_nodes = self.cfg.GetNodeList()
1358
    all_nodes.remove(self.op.node_name)
1359
    return env, all_nodes, all_nodes
1360

    
1361
  def CheckPrereq(self):
1362
    """Check prerequisites.
1363

1364
    This checks:
1365
     - the node exists in the configuration
1366
     - it does not have primary or secondary instances
1367
     - it's not the master
1368

1369
    Any errors are signalled by raising errors.OpPrereqError.
1370

1371
    """
1372
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1373
    if node is None:
1374
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1375

    
1376
    instance_list = self.cfg.GetInstanceList()
1377

    
1378
    masternode = self.sstore.GetMasterNode()
1379
    if node.name == masternode:
1380
      raise errors.OpPrereqError("Node is the master node,"
1381
                                 " you need to failover first.")
1382

    
1383
    for instance_name in instance_list:
1384
      instance = self.cfg.GetInstanceInfo(instance_name)
1385
      if node.name == instance.primary_node:
1386
        raise errors.OpPrereqError("Instance %s still running on the node,"
1387
                                   " please remove first." % instance_name)
1388
      if node.name in instance.secondary_nodes:
1389
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1390
                                   " please remove first." % instance_name)
1391
    self.op.node_name = node.name
1392
    self.node = node
1393

    
1394
  def Exec(self, feedback_fn):
1395
    """Removes the node from the cluster.
1396

1397
    """
1398
    node = self.node
1399
    logger.Info("stopping the node daemon and removing configs from node %s" %
1400
                node.name)
1401

    
1402
    self.context.RemoveNode(node.name)
1403

    
1404
    rpc.call_node_leave_cluster(node.name)
1405

    
1406

    
1407
class LUQueryNodes(NoHooksLU):
1408
  """Logical unit for querying nodes.
1409

1410
  """
1411
  _OP_REQP = ["output_fields", "names"]
1412
  REQ_BGL = False
1413

    
1414
  def ExpandNames(self):
1415
    self.dynamic_fields = frozenset([
1416
      "dtotal", "dfree",
1417
      "mtotal", "mnode", "mfree",
1418
      "bootid",
1419
      "ctotal",
1420
      ])
1421

    
1422
    self.static_fields = frozenset([
1423
      "name", "pinst_cnt", "sinst_cnt",
1424
      "pinst_list", "sinst_list",
1425
      "pip", "sip", "tags",
1426
      ])
1427

    
1428
    _CheckOutputFields(static=self.static_fields,
1429
                       dynamic=self.dynamic_fields,
1430
                       selected=self.op.output_fields)
1431

    
1432
    self.needed_locks = {}
1433
    self.share_locks[locking.LEVEL_NODE] = 1
1434

    
1435
    if self.op.names:
1436
      self.wanted = _GetWantedNodes(self, self.op.names)
1437
    else:
1438
      self.wanted = locking.ALL_SET
1439

    
1440
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1441
    if self.do_locking:
1442
      # if we don't request only static fields, we need to lock the nodes
1443
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1444

    
1445

    
1446
  def CheckPrereq(self):
1447
    """Check prerequisites.
1448

1449
    """
1450
    # The validation of the node list is done in the _GetWantedNodes,
1451
    # if non empty, and if empty, there's no validation to do
1452
    pass
1453

    
1454
  def Exec(self, feedback_fn):
1455
    """Computes the list of nodes and their attributes.
1456

1457
    """
1458
    all_info = self.cfg.GetAllNodesInfo()
1459
    if self.do_locking:
1460
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1461
    elif self.wanted != locking.ALL_SET:
1462
      nodenames = self.wanted
1463
      missing = set(nodenames).difference(all_info.keys())
1464
      if missing:
1465
        raise self.OpExecError(
1466
          "Some nodes were removed before retrieving their data: %s" % missing)
1467
    else:
1468
      nodenames = all_info.keys()
1469
    nodelist = [all_info[name] for name in nodenames]
1470

    
1471
    # begin data gathering
1472

    
1473
    if self.dynamic_fields.intersection(self.op.output_fields):
1474
      live_data = {}
1475
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1476
      for name in nodenames:
1477
        nodeinfo = node_data.get(name, None)
1478
        if nodeinfo:
1479
          live_data[name] = {
1480
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1481
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1482
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1483
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1484
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1485
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1486
            "bootid": nodeinfo['bootid'],
1487
            }
1488
        else:
1489
          live_data[name] = {}
1490
    else:
1491
      live_data = dict.fromkeys(nodenames, {})
1492

    
1493
    node_to_primary = dict([(name, set()) for name in nodenames])
1494
    node_to_secondary = dict([(name, set()) for name in nodenames])
1495

    
1496
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1497
                             "sinst_cnt", "sinst_list"))
1498
    if inst_fields & frozenset(self.op.output_fields):
1499
      instancelist = self.cfg.GetInstanceList()
1500

    
1501
      for instance_name in instancelist:
1502
        inst = self.cfg.GetInstanceInfo(instance_name)
1503
        if inst.primary_node in node_to_primary:
1504
          node_to_primary[inst.primary_node].add(inst.name)
1505
        for secnode in inst.secondary_nodes:
1506
          if secnode in node_to_secondary:
1507
            node_to_secondary[secnode].add(inst.name)
1508

    
1509
    # end data gathering
1510

    
1511
    output = []
1512
    for node in nodelist:
1513
      node_output = []
1514
      for field in self.op.output_fields:
1515
        if field == "name":
1516
          val = node.name
1517
        elif field == "pinst_list":
1518
          val = list(node_to_primary[node.name])
1519
        elif field == "sinst_list":
1520
          val = list(node_to_secondary[node.name])
1521
        elif field == "pinst_cnt":
1522
          val = len(node_to_primary[node.name])
1523
        elif field == "sinst_cnt":
1524
          val = len(node_to_secondary[node.name])
1525
        elif field == "pip":
1526
          val = node.primary_ip
1527
        elif field == "sip":
1528
          val = node.secondary_ip
1529
        elif field == "tags":
1530
          val = list(node.GetTags())
1531
        elif field in self.dynamic_fields:
1532
          val = live_data[node.name].get(field, None)
1533
        else:
1534
          raise errors.ParameterError(field)
1535
        node_output.append(val)
1536
      output.append(node_output)
1537

    
1538
    return output
1539

    
1540

    
1541
class LUQueryNodeVolumes(NoHooksLU):
1542
  """Logical unit for getting volumes on node(s).
1543

1544
  """
1545
  _OP_REQP = ["nodes", "output_fields"]
1546
  REQ_BGL = False
1547

    
1548
  def ExpandNames(self):
1549
    _CheckOutputFields(static=["node"],
1550
                       dynamic=["phys", "vg", "name", "size", "instance"],
1551
                       selected=self.op.output_fields)
1552

    
1553
    self.needed_locks = {}
1554
    self.share_locks[locking.LEVEL_NODE] = 1
1555
    if not self.op.nodes:
1556
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1557
    else:
1558
      self.needed_locks[locking.LEVEL_NODE] = \
1559
        _GetWantedNodes(self, self.op.nodes)
1560

    
1561
  def CheckPrereq(self):
1562
    """Check prerequisites.
1563

1564
    This checks that the fields required are valid output fields.
1565

1566
    """
1567
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1568

    
1569
  def Exec(self, feedback_fn):
1570
    """Computes the list of nodes and their attributes.
1571

1572
    """
1573
    nodenames = self.nodes
1574
    volumes = rpc.call_node_volumes(nodenames)
1575

    
1576
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1577
             in self.cfg.GetInstanceList()]
1578

    
1579
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1580

    
1581
    output = []
1582
    for node in nodenames:
1583
      if node not in volumes or not volumes[node]:
1584
        continue
1585

    
1586
      node_vols = volumes[node][:]
1587
      node_vols.sort(key=lambda vol: vol['dev'])
1588

    
1589
      for vol in node_vols:
1590
        node_output = []
1591
        for field in self.op.output_fields:
1592
          if field == "node":
1593
            val = node
1594
          elif field == "phys":
1595
            val = vol['dev']
1596
          elif field == "vg":
1597
            val = vol['vg']
1598
          elif field == "name":
1599
            val = vol['name']
1600
          elif field == "size":
1601
            val = int(float(vol['size']))
1602
          elif field == "instance":
1603
            for inst in ilist:
1604
              if node not in lv_by_node[inst]:
1605
                continue
1606
              if vol['name'] in lv_by_node[inst][node]:
1607
                val = inst.name
1608
                break
1609
            else:
1610
              val = '-'
1611
          else:
1612
            raise errors.ParameterError(field)
1613
          node_output.append(str(val))
1614

    
1615
        output.append(node_output)
1616

    
1617
    return output
1618

    
1619

    
1620
class LUAddNode(LogicalUnit):
1621
  """Logical unit for adding node to the cluster.
1622

1623
  """
1624
  HPATH = "node-add"
1625
  HTYPE = constants.HTYPE_NODE
1626
  _OP_REQP = ["node_name"]
1627

    
1628
  def BuildHooksEnv(self):
1629
    """Build hooks env.
1630

1631
    This will run on all nodes before, and on all nodes + the new node after.
1632

1633
    """
1634
    env = {
1635
      "OP_TARGET": self.op.node_name,
1636
      "NODE_NAME": self.op.node_name,
1637
      "NODE_PIP": self.op.primary_ip,
1638
      "NODE_SIP": self.op.secondary_ip,
1639
      }
1640
    nodes_0 = self.cfg.GetNodeList()
1641
    nodes_1 = nodes_0 + [self.op.node_name, ]
1642
    return env, nodes_0, nodes_1
1643

    
1644
  def CheckPrereq(self):
1645
    """Check prerequisites.
1646

1647
    This checks:
1648
     - the new node is not already in the config
1649
     - it is resolvable
1650
     - its parameters (single/dual homed) matches the cluster
1651

1652
    Any errors are signalled by raising errors.OpPrereqError.
1653

1654
    """
1655
    node_name = self.op.node_name
1656
    cfg = self.cfg
1657

    
1658
    dns_data = utils.HostInfo(node_name)
1659

    
1660
    node = dns_data.name
1661
    primary_ip = self.op.primary_ip = dns_data.ip
1662
    secondary_ip = getattr(self.op, "secondary_ip", None)
1663
    if secondary_ip is None:
1664
      secondary_ip = primary_ip
1665
    if not utils.IsValidIP(secondary_ip):
1666
      raise errors.OpPrereqError("Invalid secondary IP given")
1667
    self.op.secondary_ip = secondary_ip
1668

    
1669
    node_list = cfg.GetNodeList()
1670
    if not self.op.readd and node in node_list:
1671
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1672
                                 node)
1673
    elif self.op.readd and node not in node_list:
1674
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1675

    
1676
    for existing_node_name in node_list:
1677
      existing_node = cfg.GetNodeInfo(existing_node_name)
1678

    
1679
      if self.op.readd and node == existing_node_name:
1680
        if (existing_node.primary_ip != primary_ip or
1681
            existing_node.secondary_ip != secondary_ip):
1682
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1683
                                     " address configuration as before")
1684
        continue
1685

    
1686
      if (existing_node.primary_ip == primary_ip or
1687
          existing_node.secondary_ip == primary_ip or
1688
          existing_node.primary_ip == secondary_ip or
1689
          existing_node.secondary_ip == secondary_ip):
1690
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1691
                                   " existing node %s" % existing_node.name)
1692

    
1693
    # check that the type of the node (single versus dual homed) is the
1694
    # same as for the master
1695
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1696
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1697
    newbie_singlehomed = secondary_ip == primary_ip
1698
    if master_singlehomed != newbie_singlehomed:
1699
      if master_singlehomed:
1700
        raise errors.OpPrereqError("The master has no private ip but the"
1701
                                   " new node has one")
1702
      else:
1703
        raise errors.OpPrereqError("The master has a private ip but the"
1704
                                   " new node doesn't have one")
1705

    
1706
    # checks reachablity
1707
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1708
      raise errors.OpPrereqError("Node not reachable by ping")
1709

    
1710
    if not newbie_singlehomed:
1711
      # check reachability from my secondary ip to newbie's secondary ip
1712
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1713
                           source=myself.secondary_ip):
1714
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1715
                                   " based ping to noded port")
1716

    
1717
    self.new_node = objects.Node(name=node,
1718
                                 primary_ip=primary_ip,
1719
                                 secondary_ip=secondary_ip)
1720

    
1721
  def Exec(self, feedback_fn):
1722
    """Adds the new node to the cluster.
1723

1724
    """
1725
    new_node = self.new_node
1726
    node = new_node.name
1727

    
1728
    # check connectivity
1729
    result = rpc.call_version([node])[node]
1730
    if result:
1731
      if constants.PROTOCOL_VERSION == result:
1732
        logger.Info("communication to node %s fine, sw version %s match" %
1733
                    (node, result))
1734
      else:
1735
        raise errors.OpExecError("Version mismatch master version %s,"
1736
                                 " node version %s" %
1737
                                 (constants.PROTOCOL_VERSION, result))
1738
    else:
1739
      raise errors.OpExecError("Cannot get version from the new node")
1740

    
1741
    # setup ssh on node
1742
    logger.Info("copy ssh key to node %s" % node)
1743
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1744
    keyarray = []
1745
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1746
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1747
                priv_key, pub_key]
1748

    
1749
    for i in keyfiles:
1750
      f = open(i, 'r')
1751
      try:
1752
        keyarray.append(f.read())
1753
      finally:
1754
        f.close()
1755

    
1756
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1757
                               keyarray[3], keyarray[4], keyarray[5])
1758

    
1759
    if not result:
1760
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1761

    
1762
    # Add node to our /etc/hosts, and add key to known_hosts
1763
    utils.AddHostToEtcHosts(new_node.name)
1764

    
1765
    if new_node.secondary_ip != new_node.primary_ip:
1766
      if not rpc.call_node_tcp_ping(new_node.name,
1767
                                    constants.LOCALHOST_IP_ADDRESS,
1768
                                    new_node.secondary_ip,
1769
                                    constants.DEFAULT_NODED_PORT,
1770
                                    10, False):
1771
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1772
                                 " you gave (%s). Please fix and re-run this"
1773
                                 " command." % new_node.secondary_ip)
1774

    
1775
    node_verify_list = [self.sstore.GetMasterNode()]
1776
    node_verify_param = {
1777
      'nodelist': [node],
1778
      # TODO: do a node-net-test as well?
1779
    }
1780

    
1781
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1782
    for verifier in node_verify_list:
1783
      if not result[verifier]:
1784
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1785
                                 " for remote verification" % verifier)
1786
      if result[verifier]['nodelist']:
1787
        for failed in result[verifier]['nodelist']:
1788
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1789
                      (verifier, result[verifier]['nodelist'][failed]))
1790
        raise errors.OpExecError("ssh/hostname verification failed.")
1791

    
1792
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1793
    # including the node just added
1794
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1795
    dist_nodes = self.cfg.GetNodeList()
1796
    if not self.op.readd:
1797
      dist_nodes.append(node)
1798
    if myself.name in dist_nodes:
1799
      dist_nodes.remove(myself.name)
1800

    
1801
    logger.Debug("Copying hosts and known_hosts to all nodes")
1802
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1803
      result = rpc.call_upload_file(dist_nodes, fname)
1804
      for to_node in dist_nodes:
1805
        if not result[to_node]:
1806
          logger.Error("copy of file %s to node %s failed" %
1807
                       (fname, to_node))
1808

    
1809
    to_copy = self.sstore.GetFileList()
1810
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1811
      to_copy.append(constants.VNC_PASSWORD_FILE)
1812
    for fname in to_copy:
1813
      result = rpc.call_upload_file([node], fname)
1814
      if not result[node]:
1815
        logger.Error("could not copy file %s to node %s" % (fname, node))
1816

    
1817
    if self.op.readd:
1818
      self.context.ReaddNode(new_node)
1819
    else:
1820
      self.context.AddNode(new_node)
1821

    
1822

    
1823
class LUQueryClusterInfo(NoHooksLU):
1824
  """Query cluster configuration.
1825

1826
  """
1827
  _OP_REQP = []
1828
  REQ_MASTER = False
1829
  REQ_BGL = False
1830

    
1831
  def ExpandNames(self):
1832
    self.needed_locks = {}
1833

    
1834
  def CheckPrereq(self):
1835
    """No prerequsites needed for this LU.
1836

1837
    """
1838
    pass
1839

    
1840
  def Exec(self, feedback_fn):
1841
    """Return cluster config.
1842

1843
    """
1844
    result = {
1845
      "name": self.sstore.GetClusterName(),
1846
      "software_version": constants.RELEASE_VERSION,
1847
      "protocol_version": constants.PROTOCOL_VERSION,
1848
      "config_version": constants.CONFIG_VERSION,
1849
      "os_api_version": constants.OS_API_VERSION,
1850
      "export_version": constants.EXPORT_VERSION,
1851
      "master": self.sstore.GetMasterNode(),
1852
      "architecture": (platform.architecture()[0], platform.machine()),
1853
      "hypervisor_type": self.sstore.GetHypervisorType(),
1854
      }
1855

    
1856
    return result
1857

    
1858

    
1859
class LUDumpClusterConfig(NoHooksLU):
1860
  """Return a text-representation of the cluster-config.
1861

1862
  """
1863
  _OP_REQP = []
1864
  REQ_BGL = False
1865

    
1866
  def ExpandNames(self):
1867
    self.needed_locks = {}
1868

    
1869
  def CheckPrereq(self):
1870
    """No prerequisites.
1871

1872
    """
1873
    pass
1874

    
1875
  def Exec(self, feedback_fn):
1876
    """Dump a representation of the cluster config to the standard output.
1877

1878
    """
1879
    return self.cfg.DumpConfig()
1880

    
1881

    
1882
class LUActivateInstanceDisks(NoHooksLU):
1883
  """Bring up an instance's disks.
1884

1885
  """
1886
  _OP_REQP = ["instance_name"]
1887
  REQ_BGL = False
1888

    
1889
  def ExpandNames(self):
1890
    self._ExpandAndLockInstance()
1891
    self.needed_locks[locking.LEVEL_NODE] = []
1892
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1893

    
1894
  def DeclareLocks(self, level):
1895
    if level == locking.LEVEL_NODE:
1896
      self._LockInstancesNodes()
1897

    
1898
  def CheckPrereq(self):
1899
    """Check prerequisites.
1900

1901
    This checks that the instance is in the cluster.
1902

1903
    """
1904
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1905
    assert self.instance is not None, \
1906
      "Cannot retrieve locked instance %s" % self.op.instance_name
1907

    
1908
  def Exec(self, feedback_fn):
1909
    """Activate the disks.
1910

1911
    """
1912
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1913
    if not disks_ok:
1914
      raise errors.OpExecError("Cannot activate block devices")
1915

    
1916
    return disks_info
1917

    
1918

    
1919
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1920
  """Prepare the block devices for an instance.
1921

1922
  This sets up the block devices on all nodes.
1923

1924
  Args:
1925
    instance: a ganeti.objects.Instance object
1926
    ignore_secondaries: if true, errors on secondary nodes won't result
1927
                        in an error return from the function
1928

1929
  Returns:
1930
    false if the operation failed
1931
    list of (host, instance_visible_name, node_visible_name) if the operation
1932
         suceeded with the mapping from node devices to instance devices
1933
  """
1934
  device_info = []
1935
  disks_ok = True
1936
  iname = instance.name
1937
  # With the two passes mechanism we try to reduce the window of
1938
  # opportunity for the race condition of switching DRBD to primary
1939
  # before handshaking occured, but we do not eliminate it
1940

    
1941
  # The proper fix would be to wait (with some limits) until the
1942
  # connection has been made and drbd transitions from WFConnection
1943
  # into any other network-connected state (Connected, SyncTarget,
1944
  # SyncSource, etc.)
1945

    
1946
  # 1st pass, assemble on all nodes in secondary mode
1947
  for inst_disk in instance.disks:
1948
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1949
      cfg.SetDiskID(node_disk, node)
1950
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1951
      if not result:
1952
        logger.Error("could not prepare block device %s on node %s"
1953
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1954
        if not ignore_secondaries:
1955
          disks_ok = False
1956

    
1957
  # FIXME: race condition on drbd migration to primary
1958

    
1959
  # 2nd pass, do only the primary node
1960
  for inst_disk in instance.disks:
1961
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1962
      if node != instance.primary_node:
1963
        continue
1964
      cfg.SetDiskID(node_disk, node)
1965
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1966
      if not result:
1967
        logger.Error("could not prepare block device %s on node %s"
1968
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1969
        disks_ok = False
1970
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1971

    
1972
  # leave the disks configured for the primary node
1973
  # this is a workaround that would be fixed better by
1974
  # improving the logical/physical id handling
1975
  for disk in instance.disks:
1976
    cfg.SetDiskID(disk, instance.primary_node)
1977

    
1978
  return disks_ok, device_info
1979

    
1980

    
1981
def _StartInstanceDisks(cfg, instance, force):
1982
  """Start the disks of an instance.
1983

1984
  """
1985
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1986
                                           ignore_secondaries=force)
1987
  if not disks_ok:
1988
    _ShutdownInstanceDisks(instance, cfg)
1989
    if force is not None and not force:
1990
      logger.Error("If the message above refers to a secondary node,"
1991
                   " you can retry the operation using '--force'.")
1992
    raise errors.OpExecError("Disk consistency error")
1993

    
1994

    
1995
class LUDeactivateInstanceDisks(NoHooksLU):
1996
  """Shutdown an instance's disks.
1997

1998
  """
1999
  _OP_REQP = ["instance_name"]
2000
  REQ_BGL = False
2001

    
2002
  def ExpandNames(self):
2003
    self._ExpandAndLockInstance()
2004
    self.needed_locks[locking.LEVEL_NODE] = []
2005
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2006

    
2007
  def DeclareLocks(self, level):
2008
    if level == locking.LEVEL_NODE:
2009
      self._LockInstancesNodes()
2010

    
2011
  def CheckPrereq(self):
2012
    """Check prerequisites.
2013

2014
    This checks that the instance is in the cluster.
2015

2016
    """
2017
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2018
    assert self.instance is not None, \
2019
      "Cannot retrieve locked instance %s" % self.op.instance_name
2020

    
2021
  def Exec(self, feedback_fn):
2022
    """Deactivate the disks
2023

2024
    """
2025
    instance = self.instance
2026
    _SafeShutdownInstanceDisks(instance, self.cfg)
2027

    
2028

    
2029
def _SafeShutdownInstanceDisks(instance, cfg):
2030
  """Shutdown block devices of an instance.
2031

2032
  This function checks if an instance is running, before calling
2033
  _ShutdownInstanceDisks.
2034

2035
  """
2036
  ins_l = rpc.call_instance_list([instance.primary_node])
2037
  ins_l = ins_l[instance.primary_node]
2038
  if not type(ins_l) is list:
2039
    raise errors.OpExecError("Can't contact node '%s'" %
2040
                             instance.primary_node)
2041

    
2042
  if instance.name in ins_l:
2043
    raise errors.OpExecError("Instance is running, can't shutdown"
2044
                             " block devices.")
2045

    
2046
  _ShutdownInstanceDisks(instance, cfg)
2047

    
2048

    
2049
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2050
  """Shutdown block devices of an instance.
2051

2052
  This does the shutdown on all nodes of the instance.
2053

2054
  If the ignore_primary is false, errors on the primary node are
2055
  ignored.
2056

2057
  """
2058
  result = True
2059
  for disk in instance.disks:
2060
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2061
      cfg.SetDiskID(top_disk, node)
2062
      if not rpc.call_blockdev_shutdown(node, top_disk):
2063
        logger.Error("could not shutdown block device %s on node %s" %
2064
                     (disk.iv_name, node))
2065
        if not ignore_primary or node != instance.primary_node:
2066
          result = False
2067
  return result
2068

    
2069

    
2070
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2071
  """Checks if a node has enough free memory.
2072

2073
  This function check if a given node has the needed amount of free
2074
  memory. In case the node has less memory or we cannot get the
2075
  information from the node, this function raise an OpPrereqError
2076
  exception.
2077

2078
  Args:
2079
    - cfg: a ConfigWriter instance
2080
    - node: the node name
2081
    - reason: string to use in the error message
2082
    - requested: the amount of memory in MiB
2083

2084
  """
2085
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2086
  if not nodeinfo or not isinstance(nodeinfo, dict):
2087
    raise errors.OpPrereqError("Could not contact node %s for resource"
2088
                             " information" % (node,))
2089

    
2090
  free_mem = nodeinfo[node].get('memory_free')
2091
  if not isinstance(free_mem, int):
2092
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2093
                             " was '%s'" % (node, free_mem))
2094
  if requested > free_mem:
2095
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2096
                             " needed %s MiB, available %s MiB" %
2097
                             (node, reason, requested, free_mem))
2098

    
2099

    
2100
class LUStartupInstance(LogicalUnit):
2101
  """Starts an instance.
2102

2103
  """
2104
  HPATH = "instance-start"
2105
  HTYPE = constants.HTYPE_INSTANCE
2106
  _OP_REQP = ["instance_name", "force"]
2107
  REQ_BGL = False
2108

    
2109
  def ExpandNames(self):
2110
    self._ExpandAndLockInstance()
2111
    self.needed_locks[locking.LEVEL_NODE] = []
2112
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2113

    
2114
  def DeclareLocks(self, level):
2115
    if level == locking.LEVEL_NODE:
2116
      self._LockInstancesNodes()
2117

    
2118
  def BuildHooksEnv(self):
2119
    """Build hooks env.
2120

2121
    This runs on master, primary and secondary nodes of the instance.
2122

2123
    """
2124
    env = {
2125
      "FORCE": self.op.force,
2126
      }
2127
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2128
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2129
          list(self.instance.secondary_nodes))
2130
    return env, nl, nl
2131

    
2132
  def CheckPrereq(self):
2133
    """Check prerequisites.
2134

2135
    This checks that the instance is in the cluster.
2136

2137
    """
2138
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2139
    assert self.instance is not None, \
2140
      "Cannot retrieve locked instance %s" % self.op.instance_name
2141

    
2142
    # check bridges existance
2143
    _CheckInstanceBridgesExist(instance)
2144

    
2145
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2146
                         "starting instance %s" % instance.name,
2147
                         instance.memory)
2148

    
2149
  def Exec(self, feedback_fn):
2150
    """Start the instance.
2151

2152
    """
2153
    instance = self.instance
2154
    force = self.op.force
2155
    extra_args = getattr(self.op, "extra_args", "")
2156

    
2157
    self.cfg.MarkInstanceUp(instance.name)
2158

    
2159
    node_current = instance.primary_node
2160

    
2161
    _StartInstanceDisks(self.cfg, instance, force)
2162

    
2163
    if not rpc.call_instance_start(node_current, instance, extra_args):
2164
      _ShutdownInstanceDisks(instance, self.cfg)
2165
      raise errors.OpExecError("Could not start instance")
2166

    
2167

    
2168
class LURebootInstance(LogicalUnit):
2169
  """Reboot an instance.
2170

2171
  """
2172
  HPATH = "instance-reboot"
2173
  HTYPE = constants.HTYPE_INSTANCE
2174
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2175
  REQ_BGL = False
2176

    
2177
  def ExpandNames(self):
2178
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2179
                                   constants.INSTANCE_REBOOT_HARD,
2180
                                   constants.INSTANCE_REBOOT_FULL]:
2181
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2182
                                  (constants.INSTANCE_REBOOT_SOFT,
2183
                                   constants.INSTANCE_REBOOT_HARD,
2184
                                   constants.INSTANCE_REBOOT_FULL))
2185
    self._ExpandAndLockInstance()
2186
    self.needed_locks[locking.LEVEL_NODE] = []
2187
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2188

    
2189
  def DeclareLocks(self, level):
2190
    if level == locking.LEVEL_NODE:
2191
      primary_only = not constants.INSTANCE_REBOOT_FULL
2192
      self._LockInstancesNodes(primary_only=primary_only)
2193

    
2194
  def BuildHooksEnv(self):
2195
    """Build hooks env.
2196

2197
    This runs on master, primary and secondary nodes of the instance.
2198

2199
    """
2200
    env = {
2201
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2202
      }
2203
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2204
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2205
          list(self.instance.secondary_nodes))
2206
    return env, nl, nl
2207

    
2208
  def CheckPrereq(self):
2209
    """Check prerequisites.
2210

2211
    This checks that the instance is in the cluster.
2212

2213
    """
2214
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2215
    assert self.instance is not None, \
2216
      "Cannot retrieve locked instance %s" % self.op.instance_name
2217

    
2218
    # check bridges existance
2219
    _CheckInstanceBridgesExist(instance)
2220

    
2221
  def Exec(self, feedback_fn):
2222
    """Reboot the instance.
2223

2224
    """
2225
    instance = self.instance
2226
    ignore_secondaries = self.op.ignore_secondaries
2227
    reboot_type = self.op.reboot_type
2228
    extra_args = getattr(self.op, "extra_args", "")
2229

    
2230
    node_current = instance.primary_node
2231

    
2232
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2233
                       constants.INSTANCE_REBOOT_HARD]:
2234
      if not rpc.call_instance_reboot(node_current, instance,
2235
                                      reboot_type, extra_args):
2236
        raise errors.OpExecError("Could not reboot instance")
2237
    else:
2238
      if not rpc.call_instance_shutdown(node_current, instance):
2239
        raise errors.OpExecError("could not shutdown instance for full reboot")
2240
      _ShutdownInstanceDisks(instance, self.cfg)
2241
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2242
      if not rpc.call_instance_start(node_current, instance, extra_args):
2243
        _ShutdownInstanceDisks(instance, self.cfg)
2244
        raise errors.OpExecError("Could not start instance for full reboot")
2245

    
2246
    self.cfg.MarkInstanceUp(instance.name)
2247

    
2248

    
2249
class LUShutdownInstance(LogicalUnit):
2250
  """Shutdown an instance.
2251

2252
  """
2253
  HPATH = "instance-stop"
2254
  HTYPE = constants.HTYPE_INSTANCE
2255
  _OP_REQP = ["instance_name"]
2256
  REQ_BGL = False
2257

    
2258
  def ExpandNames(self):
2259
    self._ExpandAndLockInstance()
2260
    self.needed_locks[locking.LEVEL_NODE] = []
2261
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2262

    
2263
  def DeclareLocks(self, level):
2264
    if level == locking.LEVEL_NODE:
2265
      self._LockInstancesNodes()
2266

    
2267
  def BuildHooksEnv(self):
2268
    """Build hooks env.
2269

2270
    This runs on master, primary and secondary nodes of the instance.
2271

2272
    """
2273
    env = _BuildInstanceHookEnvByObject(self.instance)
2274
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2275
          list(self.instance.secondary_nodes))
2276
    return env, nl, nl
2277

    
2278
  def CheckPrereq(self):
2279
    """Check prerequisites.
2280

2281
    This checks that the instance is in the cluster.
2282

2283
    """
2284
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2285
    assert self.instance is not None, \
2286
      "Cannot retrieve locked instance %s" % self.op.instance_name
2287

    
2288
  def Exec(self, feedback_fn):
2289
    """Shutdown the instance.
2290

2291
    """
2292
    instance = self.instance
2293
    node_current = instance.primary_node
2294
    self.cfg.MarkInstanceDown(instance.name)
2295
    if not rpc.call_instance_shutdown(node_current, instance):
2296
      logger.Error("could not shutdown instance")
2297

    
2298
    _ShutdownInstanceDisks(instance, self.cfg)
2299

    
2300

    
2301
class LUReinstallInstance(LogicalUnit):
2302
  """Reinstall an instance.
2303

2304
  """
2305
  HPATH = "instance-reinstall"
2306
  HTYPE = constants.HTYPE_INSTANCE
2307
  _OP_REQP = ["instance_name"]
2308
  REQ_BGL = False
2309

    
2310
  def ExpandNames(self):
2311
    self._ExpandAndLockInstance()
2312
    self.needed_locks[locking.LEVEL_NODE] = []
2313
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2314

    
2315
  def DeclareLocks(self, level):
2316
    if level == locking.LEVEL_NODE:
2317
      self._LockInstancesNodes()
2318

    
2319
  def BuildHooksEnv(self):
2320
    """Build hooks env.
2321

2322
    This runs on master, primary and secondary nodes of the instance.
2323

2324
    """
2325
    env = _BuildInstanceHookEnvByObject(self.instance)
2326
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2327
          list(self.instance.secondary_nodes))
2328
    return env, nl, nl
2329

    
2330
  def CheckPrereq(self):
2331
    """Check prerequisites.
2332

2333
    This checks that the instance is in the cluster and is not running.
2334

2335
    """
2336
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2337
    assert instance is not None, \
2338
      "Cannot retrieve locked instance %s" % self.op.instance_name
2339

    
2340
    if instance.disk_template == constants.DT_DISKLESS:
2341
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2342
                                 self.op.instance_name)
2343
    if instance.status != "down":
2344
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2345
                                 self.op.instance_name)
2346
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2347
    if remote_info:
2348
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2349
                                 (self.op.instance_name,
2350
                                  instance.primary_node))
2351

    
2352
    self.op.os_type = getattr(self.op, "os_type", None)
2353
    if self.op.os_type is not None:
2354
      # OS verification
2355
      pnode = self.cfg.GetNodeInfo(
2356
        self.cfg.ExpandNodeName(instance.primary_node))
2357
      if pnode is None:
2358
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2359
                                   self.op.pnode)
2360
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2361
      if not os_obj:
2362
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2363
                                   " primary node"  % self.op.os_type)
2364

    
2365
    self.instance = instance
2366

    
2367
  def Exec(self, feedback_fn):
2368
    """Reinstall the instance.
2369

2370
    """
2371
    inst = self.instance
2372

    
2373
    if self.op.os_type is not None:
2374
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2375
      inst.os = self.op.os_type
2376
      self.cfg.AddInstance(inst)
2377

    
2378
    _StartInstanceDisks(self.cfg, inst, None)
2379
    try:
2380
      feedback_fn("Running the instance OS create scripts...")
2381
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2382
        raise errors.OpExecError("Could not install OS for instance %s"
2383
                                 " on node %s" %
2384
                                 (inst.name, inst.primary_node))
2385
    finally:
2386
      _ShutdownInstanceDisks(inst, self.cfg)
2387

    
2388

    
2389
class LURenameInstance(LogicalUnit):
2390
  """Rename an instance.
2391

2392
  """
2393
  HPATH = "instance-rename"
2394
  HTYPE = constants.HTYPE_INSTANCE
2395
  _OP_REQP = ["instance_name", "new_name"]
2396

    
2397
  def BuildHooksEnv(self):
2398
    """Build hooks env.
2399

2400
    This runs on master, primary and secondary nodes of the instance.
2401

2402
    """
2403
    env = _BuildInstanceHookEnvByObject(self.instance)
2404
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2405
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2406
          list(self.instance.secondary_nodes))
2407
    return env, nl, nl
2408

    
2409
  def CheckPrereq(self):
2410
    """Check prerequisites.
2411

2412
    This checks that the instance is in the cluster and is not running.
2413

2414
    """
2415
    instance = self.cfg.GetInstanceInfo(
2416
      self.cfg.ExpandInstanceName(self.op.instance_name))
2417
    if instance is None:
2418
      raise errors.OpPrereqError("Instance '%s' not known" %
2419
                                 self.op.instance_name)
2420
    if instance.status != "down":
2421
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2422
                                 self.op.instance_name)
2423
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2424
    if remote_info:
2425
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2426
                                 (self.op.instance_name,
2427
                                  instance.primary_node))
2428
    self.instance = instance
2429

    
2430
    # new name verification
2431
    name_info = utils.HostInfo(self.op.new_name)
2432

    
2433
    self.op.new_name = new_name = name_info.name
2434
    instance_list = self.cfg.GetInstanceList()
2435
    if new_name in instance_list:
2436
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2437
                                 new_name)
2438

    
2439
    if not getattr(self.op, "ignore_ip", False):
2440
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2441
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2442
                                   (name_info.ip, new_name))
2443

    
2444

    
2445
  def Exec(self, feedback_fn):
2446
    """Reinstall the instance.
2447

2448
    """
2449
    inst = self.instance
2450
    old_name = inst.name
2451

    
2452
    if inst.disk_template == constants.DT_FILE:
2453
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2454

    
2455
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2456
    # Change the instance lock. This is definitely safe while we hold the BGL
2457
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2458
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2459

    
2460
    # re-read the instance from the configuration after rename
2461
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2462

    
2463
    if inst.disk_template == constants.DT_FILE:
2464
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2465
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2466
                                                old_file_storage_dir,
2467
                                                new_file_storage_dir)
2468

    
2469
      if not result:
2470
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2471
                                 " directory '%s' to '%s' (but the instance"
2472
                                 " has been renamed in Ganeti)" % (
2473
                                 inst.primary_node, old_file_storage_dir,
2474
                                 new_file_storage_dir))
2475

    
2476
      if not result[0]:
2477
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2478
                                 " (but the instance has been renamed in"
2479
                                 " Ganeti)" % (old_file_storage_dir,
2480
                                               new_file_storage_dir))
2481

    
2482
    _StartInstanceDisks(self.cfg, inst, None)
2483
    try:
2484
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2485
                                          "sda", "sdb"):
2486
        msg = ("Could not run OS rename script for instance %s on node %s"
2487
               " (but the instance has been renamed in Ganeti)" %
2488
               (inst.name, inst.primary_node))
2489
        logger.Error(msg)
2490
    finally:
2491
      _ShutdownInstanceDisks(inst, self.cfg)
2492

    
2493

    
2494
class LURemoveInstance(LogicalUnit):
2495
  """Remove an instance.
2496

2497
  """
2498
  HPATH = "instance-remove"
2499
  HTYPE = constants.HTYPE_INSTANCE
2500
  _OP_REQP = ["instance_name", "ignore_failures"]
2501
  REQ_BGL = False
2502

    
2503
  def ExpandNames(self):
2504
    self._ExpandAndLockInstance()
2505
    self.needed_locks[locking.LEVEL_NODE] = []
2506
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2507

    
2508
  def DeclareLocks(self, level):
2509
    if level == locking.LEVEL_NODE:
2510
      self._LockInstancesNodes()
2511

    
2512
  def BuildHooksEnv(self):
2513
    """Build hooks env.
2514

2515
    This runs on master, primary and secondary nodes of the instance.
2516

2517
    """
2518
    env = _BuildInstanceHookEnvByObject(self.instance)
2519
    nl = [self.sstore.GetMasterNode()]
2520
    return env, nl, nl
2521

    
2522
  def CheckPrereq(self):
2523
    """Check prerequisites.
2524

2525
    This checks that the instance is in the cluster.
2526

2527
    """
2528
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2529
    assert self.instance is not None, \
2530
      "Cannot retrieve locked instance %s" % self.op.instance_name
2531

    
2532
  def Exec(self, feedback_fn):
2533
    """Remove the instance.
2534

2535
    """
2536
    instance = self.instance
2537
    logger.Info("shutting down instance %s on node %s" %
2538
                (instance.name, instance.primary_node))
2539

    
2540
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2541
      if self.op.ignore_failures:
2542
        feedback_fn("Warning: can't shutdown instance")
2543
      else:
2544
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2545
                                 (instance.name, instance.primary_node))
2546

    
2547
    logger.Info("removing block devices for instance %s" % instance.name)
2548

    
2549
    if not _RemoveDisks(instance, self.cfg):
2550
      if self.op.ignore_failures:
2551
        feedback_fn("Warning: can't remove instance's disks")
2552
      else:
2553
        raise errors.OpExecError("Can't remove instance's disks")
2554

    
2555
    logger.Info("removing instance %s out of cluster config" % instance.name)
2556

    
2557
    self.cfg.RemoveInstance(instance.name)
2558
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2559

    
2560

    
2561
class LUQueryInstances(NoHooksLU):
2562
  """Logical unit for querying instances.
2563

2564
  """
2565
  _OP_REQP = ["output_fields", "names"]
2566
  REQ_BGL = False
2567

    
2568
  def ExpandNames(self):
2569
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2570
    self.static_fields = frozenset([
2571
      "name", "os", "pnode", "snodes",
2572
      "admin_state", "admin_ram",
2573
      "disk_template", "ip", "mac", "bridge",
2574
      "sda_size", "sdb_size", "vcpus", "tags",
2575
      "network_port", "kernel_path", "initrd_path",
2576
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2577
      "hvm_cdrom_image_path", "hvm_nic_type",
2578
      "hvm_disk_type", "vnc_bind_address",
2579
      ])
2580
    _CheckOutputFields(static=self.static_fields,
2581
                       dynamic=self.dynamic_fields,
2582
                       selected=self.op.output_fields)
2583

    
2584
    self.needed_locks = {}
2585
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2586
    self.share_locks[locking.LEVEL_NODE] = 1
2587

    
2588
    if self.op.names:
2589
      self.wanted = _GetWantedInstances(self, self.op.names)
2590
    else:
2591
      self.wanted = locking.ALL_SET
2592

    
2593
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2594
    if self.do_locking:
2595
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2596
      self.needed_locks[locking.LEVEL_NODE] = []
2597
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2598

    
2599
  def DeclareLocks(self, level):
2600
    if level == locking.LEVEL_NODE and self.do_locking:
2601
      self._LockInstancesNodes()
2602

    
2603
  def CheckPrereq(self):
2604
    """Check prerequisites.
2605

2606
    """
2607
    pass
2608

    
2609
  def Exec(self, feedback_fn):
2610
    """Computes the list of nodes and their attributes.
2611

2612
    """
2613
    all_info = self.cfg.GetAllInstancesInfo()
2614
    if self.do_locking:
2615
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2616
    elif self.wanted != locking.ALL_SET:
2617
      instance_names = self.wanted
2618
      missing = set(instance_names).difference(all_info.keys())
2619
      if missing:
2620
        raise self.OpExecError(
2621
          "Some instances were removed before retrieving their data: %s"
2622
          % missing)
2623
    else:
2624
      instance_names = all_info.keys()
2625
    instance_list = [all_info[iname] for iname in instance_names]
2626

    
2627
    # begin data gathering
2628

    
2629
    nodes = frozenset([inst.primary_node for inst in instance_list])
2630

    
2631
    bad_nodes = []
2632
    if self.dynamic_fields.intersection(self.op.output_fields):
2633
      live_data = {}
2634
      node_data = rpc.call_all_instances_info(nodes)
2635
      for name in nodes:
2636
        result = node_data[name]
2637
        if result:
2638
          live_data.update(result)
2639
        elif result == False:
2640
          bad_nodes.append(name)
2641
        # else no instance is alive
2642
    else:
2643
      live_data = dict([(name, {}) for name in instance_names])
2644

    
2645
    # end data gathering
2646

    
2647
    output = []
2648
    for instance in instance_list:
2649
      iout = []
2650
      for field in self.op.output_fields:
2651
        if field == "name":
2652
          val = instance.name
2653
        elif field == "os":
2654
          val = instance.os
2655
        elif field == "pnode":
2656
          val = instance.primary_node
2657
        elif field == "snodes":
2658
          val = list(instance.secondary_nodes)
2659
        elif field == "admin_state":
2660
          val = (instance.status != "down")
2661
        elif field == "oper_state":
2662
          if instance.primary_node in bad_nodes:
2663
            val = None
2664
          else:
2665
            val = bool(live_data.get(instance.name))
2666
        elif field == "status":
2667
          if instance.primary_node in bad_nodes:
2668
            val = "ERROR_nodedown"
2669
          else:
2670
            running = bool(live_data.get(instance.name))
2671
            if running:
2672
              if instance.status != "down":
2673
                val = "running"
2674
              else:
2675
                val = "ERROR_up"
2676
            else:
2677
              if instance.status != "down":
2678
                val = "ERROR_down"
2679
              else:
2680
                val = "ADMIN_down"
2681
        elif field == "admin_ram":
2682
          val = instance.memory
2683
        elif field == "oper_ram":
2684
          if instance.primary_node in bad_nodes:
2685
            val = None
2686
          elif instance.name in live_data:
2687
            val = live_data[instance.name].get("memory", "?")
2688
          else:
2689
            val = "-"
2690
        elif field == "disk_template":
2691
          val = instance.disk_template
2692
        elif field == "ip":
2693
          val = instance.nics[0].ip
2694
        elif field == "bridge":
2695
          val = instance.nics[0].bridge
2696
        elif field == "mac":
2697
          val = instance.nics[0].mac
2698
        elif field == "sda_size" or field == "sdb_size":
2699
          disk = instance.FindDisk(field[:3])
2700
          if disk is None:
2701
            val = None
2702
          else:
2703
            val = disk.size
2704
        elif field == "vcpus":
2705
          val = instance.vcpus
2706
        elif field == "tags":
2707
          val = list(instance.GetTags())
2708
        elif field in ("network_port", "kernel_path", "initrd_path",
2709
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2710
                       "hvm_cdrom_image_path", "hvm_nic_type",
2711
                       "hvm_disk_type", "vnc_bind_address"):
2712
          val = getattr(instance, field, None)
2713
          if val is not None:
2714
            pass
2715
          elif field in ("hvm_nic_type", "hvm_disk_type",
2716
                         "kernel_path", "initrd_path"):
2717
            val = "default"
2718
          else:
2719
            val = "-"
2720
        else:
2721
          raise errors.ParameterError(field)
2722
        iout.append(val)
2723
      output.append(iout)
2724

    
2725
    return output
2726

    
2727

    
2728
class LUFailoverInstance(LogicalUnit):
2729
  """Failover an instance.
2730

2731
  """
2732
  HPATH = "instance-failover"
2733
  HTYPE = constants.HTYPE_INSTANCE
2734
  _OP_REQP = ["instance_name", "ignore_consistency"]
2735
  REQ_BGL = False
2736

    
2737
  def ExpandNames(self):
2738
    self._ExpandAndLockInstance()
2739
    self.needed_locks[locking.LEVEL_NODE] = []
2740
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2741

    
2742
  def DeclareLocks(self, level):
2743
    if level == locking.LEVEL_NODE:
2744
      self._LockInstancesNodes()
2745

    
2746
  def BuildHooksEnv(self):
2747
    """Build hooks env.
2748

2749
    This runs on master, primary and secondary nodes of the instance.
2750

2751
    """
2752
    env = {
2753
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2754
      }
2755
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2756
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2757
    return env, nl, nl
2758

    
2759
  def CheckPrereq(self):
2760
    """Check prerequisites.
2761

2762
    This checks that the instance is in the cluster.
2763

2764
    """
2765
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2766
    assert self.instance is not None, \
2767
      "Cannot retrieve locked instance %s" % self.op.instance_name
2768

    
2769
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2770
      raise errors.OpPrereqError("Instance's disk layout is not"
2771
                                 " network mirrored, cannot failover.")
2772

    
2773
    secondary_nodes = instance.secondary_nodes
2774
    if not secondary_nodes:
2775
      raise errors.ProgrammerError("no secondary node but using "
2776
                                   "a mirrored disk template")
2777

    
2778
    target_node = secondary_nodes[0]
2779
    # check memory requirements on the secondary node
2780
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2781
                         instance.name, instance.memory)
2782

    
2783
    # check bridge existance
2784
    brlist = [nic.bridge for nic in instance.nics]
2785
    if not rpc.call_bridges_exist(target_node, brlist):
2786
      raise errors.OpPrereqError("One or more target bridges %s does not"
2787
                                 " exist on destination node '%s'" %
2788
                                 (brlist, target_node))
2789

    
2790
  def Exec(self, feedback_fn):
2791
    """Failover an instance.
2792

2793
    The failover is done by shutting it down on its present node and
2794
    starting it on the secondary.
2795

2796
    """
2797
    instance = self.instance
2798

    
2799
    source_node = instance.primary_node
2800
    target_node = instance.secondary_nodes[0]
2801

    
2802
    feedback_fn("* checking disk consistency between source and target")
2803
    for dev in instance.disks:
2804
      # for drbd, these are drbd over lvm
2805
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2806
        if instance.status == "up" and not self.op.ignore_consistency:
2807
          raise errors.OpExecError("Disk %s is degraded on target node,"
2808
                                   " aborting failover." % dev.iv_name)
2809

    
2810
    feedback_fn("* shutting down instance on source node")
2811
    logger.Info("Shutting down instance %s on node %s" %
2812
                (instance.name, source_node))
2813

    
2814
    if not rpc.call_instance_shutdown(source_node, instance):
2815
      if self.op.ignore_consistency:
2816
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2817
                     " anyway. Please make sure node %s is down"  %
2818
                     (instance.name, source_node, source_node))
2819
      else:
2820
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2821
                                 (instance.name, source_node))
2822

    
2823
    feedback_fn("* deactivating the instance's disks on source node")
2824
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2825
      raise errors.OpExecError("Can't shut down the instance's disks.")
2826

    
2827
    instance.primary_node = target_node
2828
    # distribute new instance config to the other nodes
2829
    self.cfg.Update(instance)
2830

    
2831
    # Only start the instance if it's marked as up
2832
    if instance.status == "up":
2833
      feedback_fn("* activating the instance's disks on target node")
2834
      logger.Info("Starting instance %s on node %s" %
2835
                  (instance.name, target_node))
2836

    
2837
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2838
                                               ignore_secondaries=True)
2839
      if not disks_ok:
2840
        _ShutdownInstanceDisks(instance, self.cfg)
2841
        raise errors.OpExecError("Can't activate the instance's disks")
2842

    
2843
      feedback_fn("* starting the instance on the target node")
2844
      if not rpc.call_instance_start(target_node, instance, None):
2845
        _ShutdownInstanceDisks(instance, self.cfg)
2846
        raise errors.OpExecError("Could not start instance %s on node %s." %
2847
                                 (instance.name, target_node))
2848

    
2849

    
2850
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2851
  """Create a tree of block devices on the primary node.
2852

2853
  This always creates all devices.
2854

2855
  """
2856
  if device.children:
2857
    for child in device.children:
2858
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2859
        return False
2860

    
2861
  cfg.SetDiskID(device, node)
2862
  new_id = rpc.call_blockdev_create(node, device, device.size,
2863
                                    instance.name, True, info)
2864
  if not new_id:
2865
    return False
2866
  if device.physical_id is None:
2867
    device.physical_id = new_id
2868
  return True
2869

    
2870

    
2871
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2872
  """Create a tree of block devices on a secondary node.
2873

2874
  If this device type has to be created on secondaries, create it and
2875
  all its children.
2876

2877
  If not, just recurse to children keeping the same 'force' value.
2878

2879
  """
2880
  if device.CreateOnSecondary():
2881
    force = True
2882
  if device.children:
2883
    for child in device.children:
2884
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2885
                                        child, force, info):
2886
        return False
2887

    
2888
  if not force:
2889
    return True
2890
  cfg.SetDiskID(device, node)
2891
  new_id = rpc.call_blockdev_create(node, device, device.size,
2892
                                    instance.name, False, info)
2893
  if not new_id:
2894
    return False
2895
  if device.physical_id is None:
2896
    device.physical_id = new_id
2897
  return True
2898

    
2899

    
2900
def _GenerateUniqueNames(cfg, exts):
2901
  """Generate a suitable LV name.
2902

2903
  This will generate a logical volume name for the given instance.
2904

2905
  """
2906
  results = []
2907
  for val in exts:
2908
    new_id = cfg.GenerateUniqueID()
2909
    results.append("%s%s" % (new_id, val))
2910
  return results
2911

    
2912

    
2913
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2914
                         p_minor, s_minor):
2915
  """Generate a drbd8 device complete with its children.
2916

2917
  """
2918
  port = cfg.AllocatePort()
2919
  vgname = cfg.GetVGName()
2920
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2921
                          logical_id=(vgname, names[0]))
2922
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2923
                          logical_id=(vgname, names[1]))
2924
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2925
                          logical_id=(primary, secondary, port,
2926
                                      p_minor, s_minor),
2927
                          children=[dev_data, dev_meta],
2928
                          iv_name=iv_name)
2929
  return drbd_dev
2930

    
2931

    
2932
def _GenerateDiskTemplate(cfg, template_name,
2933
                          instance_name, primary_node,
2934
                          secondary_nodes, disk_sz, swap_sz,
2935
                          file_storage_dir, file_driver):
2936
  """Generate the entire disk layout for a given template type.
2937

2938
  """
2939
  #TODO: compute space requirements
2940

    
2941
  vgname = cfg.GetVGName()
2942
  if template_name == constants.DT_DISKLESS:
2943
    disks = []
2944
  elif template_name == constants.DT_PLAIN:
2945
    if len(secondary_nodes) != 0:
2946
      raise errors.ProgrammerError("Wrong template configuration")
2947

    
2948
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2949
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2950
                           logical_id=(vgname, names[0]),
2951
                           iv_name = "sda")
2952
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2953
                           logical_id=(vgname, names[1]),
2954
                           iv_name = "sdb")
2955
    disks = [sda_dev, sdb_dev]
2956
  elif template_name == constants.DT_DRBD8:
2957
    if len(secondary_nodes) != 1:
2958
      raise errors.ProgrammerError("Wrong template configuration")
2959
    remote_node = secondary_nodes[0]
2960
    (minor_pa, minor_pb,
2961
     minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2962
      [primary_node, primary_node, remote_node, remote_node], instance_name)
2963

    
2964
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2965
                                       ".sdb_data", ".sdb_meta"])
2966
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2967
                                        disk_sz, names[0:2], "sda",
2968
                                        minor_pa, minor_sa)
2969
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2970
                                        swap_sz, names[2:4], "sdb",
2971
                                        minor_pb, minor_sb)
2972
    disks = [drbd_sda_dev, drbd_sdb_dev]
2973
  elif template_name == constants.DT_FILE:
2974
    if len(secondary_nodes) != 0:
2975
      raise errors.ProgrammerError("Wrong template configuration")
2976

    
2977
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2978
                                iv_name="sda", logical_id=(file_driver,
2979
                                "%s/sda" % file_storage_dir))
2980
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2981
                                iv_name="sdb", logical_id=(file_driver,
2982
                                "%s/sdb" % file_storage_dir))
2983
    disks = [file_sda_dev, file_sdb_dev]
2984
  else:
2985
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2986
  return disks
2987

    
2988

    
2989
def _GetInstanceInfoText(instance):
2990
  """Compute that text that should be added to the disk's metadata.
2991

2992
  """
2993
  return "originstname+%s" % instance.name
2994

    
2995

    
2996
def _CreateDisks(cfg, instance):
2997
  """Create all disks for an instance.
2998

2999
  This abstracts away some work from AddInstance.
3000

3001
  Args:
3002
    instance: the instance object
3003

3004
  Returns:
3005
    True or False showing the success of the creation process
3006

3007
  """
3008
  info = _GetInstanceInfoText(instance)
3009

    
3010
  if instance.disk_template == constants.DT_FILE:
3011
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3012
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3013
                                              file_storage_dir)
3014

    
3015
    if not result:
3016
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3017
      return False
3018

    
3019
    if not result[0]:
3020
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3021
      return False
3022

    
3023
  for device in instance.disks:
3024
    logger.Info("creating volume %s for instance %s" %
3025
                (device.iv_name, instance.name))
3026
    #HARDCODE
3027
    for secondary_node in instance.secondary_nodes:
3028
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3029
                                        device, False, info):
3030
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3031
                     (device.iv_name, device, secondary_node))
3032
        return False
3033
    #HARDCODE
3034
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3035
                                    instance, device, info):
3036
      logger.Error("failed to create volume %s on primary!" %
3037
                   device.iv_name)
3038
      return False
3039

    
3040
  return True
3041

    
3042

    
3043
def _RemoveDisks(instance, cfg):
3044
  """Remove all disks for an instance.
3045

3046
  This abstracts away some work from `AddInstance()` and
3047
  `RemoveInstance()`. Note that in case some of the devices couldn't
3048
  be removed, the removal will continue with the other ones (compare
3049
  with `_CreateDisks()`).
3050

3051
  Args:
3052
    instance: the instance object
3053

3054
  Returns:
3055
    True or False showing the success of the removal proces
3056

3057
  """
3058
  logger.Info("removing block devices for instance %s" % instance.name)
3059

    
3060
  result = True
3061
  for device in instance.disks:
3062
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3063
      cfg.SetDiskID(disk, node)
3064
      if not rpc.call_blockdev_remove(node, disk):
3065
        logger.Error("could not remove block device %s on node %s,"
3066
                     " continuing anyway" %
3067
                     (device.iv_name, node))
3068
        result = False
3069

    
3070
  if instance.disk_template == constants.DT_FILE:
3071
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3072
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3073
                                            file_storage_dir):
3074
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3075
      result = False
3076

    
3077
  return result
3078

    
3079

    
3080
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3081
  """Compute disk size requirements in the volume group
3082

3083
  This is currently hard-coded for the two-drive layout.
3084

3085
  """
3086
  # Required free disk space as a function of disk and swap space
3087
  req_size_dict = {
3088
    constants.DT_DISKLESS: None,
3089
    constants.DT_PLAIN: disk_size + swap_size,
3090
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3091
    constants.DT_DRBD8: disk_size + swap_size + 256,
3092
    constants.DT_FILE: None,
3093
  }
3094

    
3095
  if disk_template not in req_size_dict:
3096
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3097
                                 " is unknown" %  disk_template)
3098

    
3099
  return req_size_dict[disk_template]
3100

    
3101

    
3102
class LUCreateInstance(LogicalUnit):
3103
  """Create an instance.
3104

3105
  """
3106
  HPATH = "instance-add"
3107
  HTYPE = constants.HTYPE_INSTANCE
3108
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3109
              "disk_template", "swap_size", "mode", "start", "vcpus",
3110
              "wait_for_sync", "ip_check", "mac"]
3111
  REQ_BGL = False
3112

    
3113
  def _ExpandNode(self, node):
3114
    """Expands and checks one node name.
3115

3116
    """
3117
    node_full = self.cfg.ExpandNodeName(node)
3118
    if node_full is None:
3119
      raise errors.OpPrereqError("Unknown node %s" % node)
3120
    return node_full
3121

    
3122
  def ExpandNames(self):
3123
    """ExpandNames for CreateInstance.
3124

3125
    Figure out the right locks for instance creation.
3126

3127
    """
3128
    self.needed_locks = {}
3129

    
3130
    # set optional parameters to none if they don't exist
3131
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3132
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3133
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3134
                 "vnc_bind_address"]:
3135
      if not hasattr(self.op, attr):
3136
        setattr(self.op, attr, None)
3137

    
3138
    # verify creation mode
3139
    if self.op.mode not in (constants.INSTANCE_CREATE,
3140
                            constants.INSTANCE_IMPORT):
3141
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3142
                                 self.op.mode)
3143
    # disk template and mirror node verification
3144
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3145
      raise errors.OpPrereqError("Invalid disk template name")
3146

    
3147
    #### instance parameters check
3148

    
3149
    # instance name verification
3150
    hostname1 = utils.HostInfo(self.op.instance_name)
3151
    self.op.instance_name = instance_name = hostname1.name
3152

    
3153
    # this is just a preventive check, but someone might still add this
3154
    # instance in the meantime, and creation will fail at lock-add time
3155
    if instance_name in self.cfg.GetInstanceList():
3156
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3157
                                 instance_name)
3158

    
3159
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3160

    
3161
    # ip validity checks
3162
    ip = getattr(self.op, "ip", None)
3163
    if ip is None or ip.lower() == "none":
3164
      inst_ip = None
3165
    elif ip.lower() == "auto":
3166
      inst_ip = hostname1.ip
3167
    else:
3168
      if not utils.IsValidIP(ip):
3169
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3170
                                   " like a valid IP" % ip)
3171
      inst_ip = ip
3172
    self.inst_ip = self.op.ip = inst_ip
3173
    # used in CheckPrereq for ip ping check
3174
    self.check_ip = hostname1.ip
3175

    
3176
    # MAC address verification
3177
    if self.op.mac != "auto":
3178
      if not utils.IsValidMac(self.op.mac.lower()):
3179
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3180
                                   self.op.mac)
3181

    
3182
    # boot order verification
3183
    if self.op.hvm_boot_order is not None:
3184
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3185
        raise errors.OpPrereqError("invalid boot order specified,"
3186
                                   " must be one or more of [acdn]")
3187
    # file storage checks
3188
    if (self.op.file_driver and
3189
        not self.op.file_driver in constants.FILE_DRIVER):
3190
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3191
                                 self.op.file_driver)
3192

    
3193
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3194
      raise errors.OpPrereqError("File storage directory path not absolute")
3195

    
3196
    ### Node/iallocator related checks
3197
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3198
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3199
                                 " node must be given")
3200

    
3201
    if self.op.iallocator:
3202
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3203
    else:
3204
      self.op.pnode = self._ExpandNode(self.op.pnode)
3205
      nodelist = [self.op.pnode]
3206
      if self.op.snode is not None:
3207
        self.op.snode = self._ExpandNode(self.op.snode)
3208
        nodelist.append(self.op.snode)
3209
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3210

    
3211
    # in case of import lock the source node too
3212
    if self.op.mode == constants.INSTANCE_IMPORT:
3213
      src_node = getattr(self.op, "src_node", None)
3214
      src_path = getattr(self.op, "src_path", None)
3215

    
3216
      if src_node is None or src_path is None:
3217
        raise errors.OpPrereqError("Importing an instance requires source"
3218
                                   " node and path options")
3219

    
3220
      if not os.path.isabs(src_path):
3221
        raise errors.OpPrereqError("The source path must be absolute")
3222

    
3223
      self.op.src_node = src_node = self._ExpandNode(src_node)
3224
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3225
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3226

    
3227
    else: # INSTANCE_CREATE
3228
      if getattr(self.op, "os_type", None) is None:
3229
        raise errors.OpPrereqError("No guest OS specified")
3230

    
3231
  def _RunAllocator(self):
3232
    """Run the allocator based on input opcode.
3233

3234
    """
3235
    disks = [{"size": self.op.disk_size, "mode": "w"},
3236
             {"size": self.op.swap_size, "mode": "w"}]
3237
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3238
             "bridge": self.op.bridge}]
3239
    ial = IAllocator(self.cfg, self.sstore,
3240
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3241
                     name=self.op.instance_name,
3242
                     disk_template=self.op.disk_template,
3243
                     tags=[],
3244
                     os=self.op.os_type,
3245
                     vcpus=self.op.vcpus,
3246
                     mem_size=self.op.mem_size,
3247
                     disks=disks,
3248
                     nics=nics,
3249
                     )
3250

    
3251
    ial.Run(self.op.iallocator)
3252

    
3253
    if not ial.success:
3254
      raise errors.OpPrereqError("Can't compute nodes using"
3255
                                 " iallocator '%s': %s" % (self.op.iallocator,
3256
                                                           ial.info))
3257
    if len(ial.nodes) != ial.required_nodes:
3258
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3259
                                 " of nodes (%s), required %s" %
3260
                                 (self.op.iallocator, len(ial.nodes), 
3261
                                  ial.required_nodes))
3262
    self.op.pnode = ial.nodes[0]
3263
    logger.ToStdout("Selected nodes for the instance: %s" %
3264
                    (", ".join(ial.nodes),))
3265
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3266
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3267
    if ial.required_nodes == 2:
3268
      self.op.snode = ial.nodes[1]
3269

    
3270
  def BuildHooksEnv(self):
3271
    """Build hooks env.
3272

3273
    This runs on master, primary and secondary nodes of the instance.
3274

3275
    """
3276
    env = {
3277
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3278
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3279
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3280
      "INSTANCE_ADD_MODE": self.op.mode,
3281
      }
3282
    if self.op.mode == constants.INSTANCE_IMPORT:
3283
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3284
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3285
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3286

    
3287
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3288
      primary_node=self.op.pnode,
3289
      secondary_nodes=self.secondaries,
3290
      status=self.instance_status,
3291
      os_type=self.op.os_type,
3292
      memory=self.op.mem_size,
3293
      vcpus=self.op.vcpus,
3294
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3295
    ))
3296

    
3297
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3298
          self.secondaries)
3299
    return env, nl, nl
3300

    
3301

    
3302
  def CheckPrereq(self):
3303
    """Check prerequisites.
3304

3305
    """
3306
    if (not self.cfg.GetVGName() and
3307
        self.op.disk_template not in constants.DTS_NOT_LVM):
3308
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3309
                                 " instances")
3310

    
3311
    if self.op.mode == constants.INSTANCE_IMPORT:
3312
      src_node = self.op.src_node
3313
      src_path = self.op.src_path
3314

    
3315
      export_info = rpc.call_export_info(src_node, src_path)
3316

    
3317
      if not export_info:
3318
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3319

    
3320
      if not export_info.has_section(constants.INISECT_EXP):
3321
        raise errors.ProgrammerError("Corrupted export config")
3322

    
3323
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3324
      if (int(ei_version) != constants.EXPORT_VERSION):
3325
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3326
                                   (ei_version, constants.EXPORT_VERSION))
3327

    
3328
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3329
        raise errors.OpPrereqError("Can't import instance with more than"
3330
                                   " one data disk")
3331

    
3332
      # FIXME: are the old os-es, disk sizes, etc. useful?
3333
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3334
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3335
                                                         'disk0_dump'))
3336
      self.src_image = diskimage
3337

    
3338
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3339

    
3340
    if self.op.start and not self.op.ip_check:
3341
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3342
                                 " adding an instance in start mode")
3343

    
3344
    if self.op.ip_check:
3345
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3346
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3347
                                   (self.check_ip, instance_name))
3348

    
3349
    # bridge verification
3350
    bridge = getattr(self.op, "bridge", None)
3351
    if bridge is None:
3352
      self.op.bridge = self.cfg.GetDefBridge()
3353
    else:
3354
      self.op.bridge = bridge
3355

    
3356
    #### allocator run
3357

    
3358
    if self.op.iallocator is not None:
3359
      self._RunAllocator()
3360

    
3361
    #### node related checks
3362

    
3363
    # check primary node
3364
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3365
    assert self.pnode is not None, \
3366
      "Cannot retrieve locked node %s" % self.op.pnode
3367
    self.secondaries = []
3368

    
3369
    # mirror node verification
3370
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3371
      if self.op.snode is None:
3372
        raise errors.OpPrereqError("The networked disk templates need"
3373
                                   " a mirror node")
3374
      if self.op.snode == pnode.name:
3375
        raise errors.OpPrereqError("The secondary node cannot be"
3376
                                   " the primary node.")
3377
      self.secondaries.append(self.op.snode)
3378

    
3379
    req_size = _ComputeDiskSize(self.op.disk_template,
3380
                                self.op.disk_size, self.op.swap_size)
3381

    
3382
    # Check lv size requirements
3383
    if req_size is not None:
3384
      nodenames = [pnode.name] + self.secondaries
3385
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3386
      for node in nodenames:
3387
        info = nodeinfo.get(node, None)
3388
        if not info:
3389
          raise errors.OpPrereqError("Cannot get current information"
3390
                                     " from node '%s'" % node)
3391
        vg_free = info.get('vg_free', None)
3392
        if not isinstance(vg_free, int):
3393
          raise errors.OpPrereqError("Can't compute free disk space on"
3394
                                     " node %s" % node)
3395
        if req_size > info['vg_free']:
3396
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3397
                                     " %d MB available, %d MB required" %
3398
                                     (node, info['vg_free'], req_size))
3399

    
3400
    # os verification
3401
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3402
    if not os_obj:
3403
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3404
                                 " primary node"  % self.op.os_type)
3405

    
3406
    if self.op.kernel_path == constants.VALUE_NONE:
3407
      raise errors.OpPrereqError("Can't set instance kernel to none")
3408

    
3409
    # bridge check on primary node
3410
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3411
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3412
                                 " destination node '%s'" %
3413
                                 (self.op.bridge, pnode.name))
3414

    
3415
    # memory check on primary node
3416
    if self.op.start:
3417
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3418
                           "creating instance %s" % self.op.instance_name,
3419
                           self.op.mem_size)
3420

    
3421
    # hvm_cdrom_image_path verification
3422
    if self.op.hvm_cdrom_image_path is not None:
3423
      # FIXME (als): shouldn't these checks happen on the destination node?
3424
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3425
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3426
                                   " be an absolute path or None, not %s" %
3427
                                   self.op.hvm_cdrom_image_path)
3428
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3429
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3430
                                   " regular file or a symlink pointing to"
3431
                                   " an existing regular file, not %s" %
3432
                                   self.op.hvm_cdrom_image_path)
3433

    
3434
    # vnc_bind_address verification
3435
    if self.op.vnc_bind_address is not None:
3436
      if not utils.IsValidIP(self.op.vnc_bind_address):
3437
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3438
                                   " like a valid IP address" %
3439
                                   self.op.vnc_bind_address)
3440

    
3441
    # Xen HVM device type checks
3442
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3443
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3444
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3445
                                   " hypervisor" % self.op.hvm_nic_type)
3446
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3447
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3448
                                   " hypervisor" % self.op.hvm_disk_type)
3449

    
3450
    if self.op.start:
3451
      self.instance_status = 'up'
3452
    else:
3453
      self.instance_status = 'down'
3454

    
3455
  def Exec(self, feedback_fn):
3456
    """Create and add the instance to the cluster.
3457

3458
    """
3459
    instance = self.op.instance_name
3460
    pnode_name = self.pnode.name
3461

    
3462
    if self.op.mac == "auto":
3463
      mac_address = self.cfg.GenerateMAC()
3464
    else:
3465
      mac_address = self.op.mac
3466

    
3467
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3468
    if self.inst_ip is not None:
3469
      nic.ip = self.inst_ip
3470

    
3471
    ht_kind = self.sstore.GetHypervisorType()
3472
    if ht_kind in constants.HTS_REQ_PORT:
3473
      network_port = self.cfg.AllocatePort()
3474
    else:
3475
      network_port = None
3476

    
3477
    if self.op.vnc_bind_address is None:
3478
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3479

    
3480
    # this is needed because os.path.join does not accept None arguments
3481
    if self.op.file_storage_dir is None:
3482
      string_file_storage_dir = ""
3483
    else:
3484
      string_file_storage_dir = self.op.file_storage_dir
3485

    
3486
    # build the full file storage dir path
3487
    file_storage_dir = os.path.normpath(os.path.join(
3488
                                        self.sstore.GetFileStorageDir(),
3489
                                        string_file_storage_dir, instance))
3490

    
3491

    
3492
    disks = _GenerateDiskTemplate(self.cfg,
3493
                                  self.op.disk_template,
3494
                                  instance, pnode_name,
3495
                                  self.secondaries, self.op.disk_size,
3496
                                  self.op.swap_size,
3497
                                  file_storage_dir,
3498
                                  self.op.file_driver)
3499

    
3500
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3501
                            primary_node=pnode_name,
3502
                            memory=self.op.mem_size,
3503
                            vcpus=self.op.vcpus,
3504
                            nics=[nic], disks=disks,
3505
                            disk_template=self.op.disk_template,
3506
                            status=self.instance_status,
3507
                            network_port=network_port,
3508
                            kernel_path=self.op.kernel_path,
3509
                            initrd_path=self.op.initrd_path,
3510
                            hvm_boot_order=self.op.hvm_boot_order,
3511
                            hvm_acpi=self.op.hvm_acpi,
3512
                            hvm_pae=self.op.hvm_pae,
3513
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3514
                            vnc_bind_address=self.op.vnc_bind_address,
3515
                            hvm_nic_type=self.op.hvm_nic_type,
3516
                            hvm_disk_type=self.op.hvm_disk_type,
3517
                            )
3518

    
3519
    feedback_fn("* creating instance disks...")
3520
    if not _CreateDisks(self.cfg, iobj):
3521
      _RemoveDisks(iobj, self.cfg)
3522
      self.cfg.ReleaseDRBDMinors(instance)
3523
      raise errors.OpExecError("Device creation failed, reverting...")
3524

    
3525
    feedback_fn("adding instance %s to cluster config" % instance)
3526

    
3527
    self.cfg.AddInstance(iobj)
3528
    # Declare that we don't want to remove the instance lock anymore, as we've
3529
    # added the instance to the config
3530
    del self.remove_locks[locking.LEVEL_INSTANCE]
3531
    # Remove the temp. assignements for the instance's drbds
3532
    self.cfg.ReleaseDRBDMinors(instance)
3533

    
3534
    if self.op.wait_for_sync:
3535
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3536
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3537
      # make sure the disks are not degraded (still sync-ing is ok)
3538
      time.sleep(15)
3539
      feedback_fn("* checking mirrors status")
3540
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3541
    else:
3542
      disk_abort = False
3543

    
3544
    if disk_abort:
3545
      _RemoveDisks(iobj, self.cfg)
3546
      self.cfg.RemoveInstance(iobj.name)
3547
      # Make sure the instance lock gets removed
3548
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3549
      raise errors.OpExecError("There are some degraded disks for"
3550
                               " this instance")
3551

    
3552
    feedback_fn("creating os for instance %s on node %s" %
3553
                (instance, pnode_name))
3554

    
3555
    if iobj.disk_template != constants.DT_DISKLESS:
3556
      if self.op.mode == constants.INSTANCE_CREATE:
3557
        feedback_fn("* running the instance OS create scripts...")
3558
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3559
          raise errors.OpExecError("could not add os for instance %s"
3560
                                   " on node %s" %
3561
                                   (instance, pnode_name))
3562

    
3563
      elif self.op.mode == constants.INSTANCE_IMPORT:
3564
        feedback_fn("* running the instance OS import scripts...")
3565
        src_node = self.op.src_node
3566
        src_image = self.src_image
3567
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3568
                                                src_node, src_image):
3569
          raise errors.OpExecError("Could not import os for instance"
3570
                                   " %s on node %s" %
3571
                                   (instance, pnode_name))
3572
      else:
3573
        # also checked in the prereq part
3574
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3575
                                     % self.op.mode)
3576

    
3577
    if self.op.start:
3578
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3579
      feedback_fn("* starting instance...")
3580
      if not rpc.call_instance_start(pnode_name, iobj, None):
3581
        raise errors.OpExecError("Could not start instance")
3582

    
3583

    
3584
class LUConnectConsole(NoHooksLU):
3585
  """Connect to an instance's console.
3586

3587
  This is somewhat special in that it returns the command line that
3588
  you need to run on the master node in order to connect to the
3589
  console.
3590

3591
  """
3592
  _OP_REQP = ["instance_name"]
3593
  REQ_BGL = False
3594

    
3595
  def ExpandNames(self):
3596
    self._ExpandAndLockInstance()
3597

    
3598
  def CheckPrereq(self):
3599
    """Check prerequisites.
3600

3601
    This checks that the instance is in the cluster.
3602

3603
    """
3604
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3605
    assert self.instance is not None, \
3606
      "Cannot retrieve locked instance %s" % self.op.instance_name
3607

    
3608
  def Exec(self, feedback_fn):
3609
    """Connect to the console of an instance
3610

3611
    """
3612
    instance = self.instance
3613
    node = instance.primary_node
3614

    
3615
    node_insts = rpc.call_instance_list([node])[node]
3616
    if node_insts is False:
3617
      raise errors.OpExecError("Can't connect to node %s." % node)
3618

    
3619
    if instance.name not in node_insts:
3620
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3621

    
3622
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3623

    
3624
    hyper = hypervisor.GetHypervisor()
3625
    console_cmd = hyper.GetShellCommandForConsole(instance)
3626

    
3627
    # build ssh cmdline
3628
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3629

    
3630

    
3631
class LUReplaceDisks(LogicalUnit):
3632
  """Replace the disks of an instance.
3633

3634
  """
3635
  HPATH = "mirrors-replace"
3636
  HTYPE = constants.HTYPE_INSTANCE
3637
  _OP_REQP = ["instance_name", "mode", "disks"]
3638
  REQ_BGL = False
3639

    
3640
  def ExpandNames(self):
3641
    self._ExpandAndLockInstance()
3642

    
3643
    if not hasattr(self.op, "remote_node"):
3644
      self.op.remote_node = None
3645

    
3646
    ia_name = getattr(self.op, "iallocator", None)
3647
    if ia_name is not None:
3648
      if self.op.remote_node is not None:
3649
        raise errors.OpPrereqError("Give either the iallocator or the new"
3650
                                   " secondary, not both")
3651
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3652
    elif self.op.remote_node is not None:
3653
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3654
      if remote_node is None:
3655
        raise errors.OpPrereqError("Node '%s' not known" %
3656
                                   self.op.remote_node)
3657
      self.op.remote_node = remote_node
3658
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3659
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3660
    else:
3661
      self.needed_locks[locking.LEVEL_NODE] = []
3662
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3663

    
3664
  def DeclareLocks(self, level):
3665
    # If we're not already locking all nodes in the set we have to declare the
3666
    # instance's primary/secondary nodes.
3667
    if (level == locking.LEVEL_NODE and
3668
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3669
      self._LockInstancesNodes()
3670

    
3671
  def _RunAllocator(self):
3672
    """Compute a new secondary node using an IAllocator.
3673

3674
    """
3675
    ial = IAllocator(self.cfg, self.sstore,
3676
                     mode=constants.IALLOCATOR_MODE_RELOC,
3677
                     name=self.op.instance_name,
3678
                     relocate_from=[self.sec_node])
3679

    
3680
    ial.Run(self.op.iallocator)
3681

    
3682
    if not ial.success:
3683
      raise errors.OpPrereqError("Can't compute nodes using"
3684
                                 " iallocator '%s': %s" % (self.op.iallocator,
3685
                                                           ial.info))
3686
    if len(ial.nodes) != ial.required_nodes:
3687
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3688
                                 " of nodes (%s), required %s" %
3689
                                 (len(ial.nodes), ial.required_nodes))
3690
    self.op.remote_node = ial.nodes[0]
3691
    logger.ToStdout("Selected new secondary for the instance: %s" %
3692
                    self.op.remote_node)
3693

    
3694
  def BuildHooksEnv(self):
3695
    """Build hooks env.
3696

3697
    This runs on the master, the primary and all the secondaries.
3698

3699
    """
3700
    env = {
3701
      "MODE": self.op.mode,
3702
      "NEW_SECONDARY": self.op.remote_node,
3703
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3704
      }
3705
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3706
    nl = [
3707
      self.sstore.GetMasterNode(),
3708
      self.instance.primary_node,
3709
      ]
3710
    if self.op.remote_node is not None:
3711
      nl.append(self.op.remote_node)
3712
    return env, nl, nl
3713

    
3714
  def CheckPrereq(self):
3715
    """Check prerequisites.
3716

3717
    This checks that the instance is in the cluster.
3718

3719
    """
3720
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3721
    assert instance is not None, \
3722
      "Cannot retrieve locked instance %s" % self.op.instance_name
3723
    self.instance = instance
3724

    
3725
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3726
      raise errors.OpPrereqError("Instance's disk layout is not"
3727
                                 " network mirrored.")
3728

    
3729
    if len(instance.secondary_nodes) != 1:
3730
      raise errors.OpPrereqError("The instance has a strange layout,"
3731
                                 " expected one secondary but found %d" %
3732
                                 len(instance.secondary_nodes))
3733

    
3734
    self.sec_node = instance.secondary_nodes[0]
3735

    
3736
    ia_name = getattr(self.op, "iallocator", None)
3737
    if ia_name is not None:
3738
      self._RunAllocator()
3739

    
3740
    remote_node = self.op.remote_node
3741
    if remote_node is not None:
3742
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3743
      assert self.remote_node_info is not None, \
3744
        "Cannot retrieve locked node %s" % remote_node
3745
    else:
3746
      self.remote_node_info = None
3747
    if remote_node == instance.primary_node:
3748
      raise errors.OpPrereqError("The specified node is the primary node of"
3749
                                 " the instance.")
3750
    elif remote_node == self.sec_node:
3751
      if self.op.mode == constants.REPLACE_DISK_SEC:
3752
        # this is for DRBD8, where we can't execute the same mode of
3753
        # replacement as for drbd7 (no different port allocated)
3754
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3755
                                   " replacement")
3756
    if instance.disk_template == constants.DT_DRBD8:
3757
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3758
          remote_node is not None):
3759
        # switch to replace secondary mode
3760
        self.op.mode = constants.REPLACE_DISK_SEC
3761

    
3762
      if self.op.mode == constants.REPLACE_DISK_ALL:
3763
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3764
                                   " secondary disk replacement, not"
3765
                                   " both at once")
3766
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3767
        if remote_node is not None:
3768
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3769
                                     " the secondary while doing a primary"
3770
                                     " node disk replacement")
3771
        self.tgt_node = instance.primary_node
3772
        self.oth_node = instance.secondary_nodes[0]
3773
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3774
        self.new_node = remote_node # this can be None, in which case
3775
                                    # we don't change the secondary
3776
        self.tgt_node = instance.secondary_nodes[0]
3777
        self.oth_node = instance.primary_node
3778
      else:
3779
        raise errors.ProgrammerError("Unhandled disk replace mode")
3780

    
3781
    for name in self.op.disks:
3782
      if instance.FindDisk(name) is None:
3783
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3784
                                   (name, instance.name))
3785

    
3786
  def _ExecD8DiskOnly(self, feedback_fn):
3787
    """Replace a disk on the primary or secondary for dbrd8.
3788

3789
    The algorithm for replace is quite complicated:
3790
      - for each disk to be replaced:
3791
        - create new LVs on the target node with unique names
3792
        - detach old LVs from the drbd device
3793
        - rename old LVs to name_replaced.<time_t>
3794
        - rename new LVs to old LVs
3795
        - attach the new LVs (with the old names now) to the drbd device
3796
      - wait for sync across all devices
3797
      - for each modified disk:
3798
        - remove old LVs (which have the name name_replaces.<time_t>)
3799

3800
    Failures are not very well handled.
3801

3802
    """
3803
    steps_total = 6
3804
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3805
    instance = self.instance
3806
    iv_names = {}
3807
    vgname = self.cfg.GetVGName()
3808
    # start of work
3809
    cfg = self.cfg
3810
    tgt_node = self.tgt_node
3811
    oth_node = self.oth_node
3812

    
3813
    # Step: check device activation
3814
    self.proc.LogStep(1, steps_total, "check device existence")
3815
    info("checking volume groups")
3816
    my_vg = cfg.GetVGName()
3817
    results = rpc.call_vg_list([oth_node, tgt_node])
3818
    if not results:
3819
      raise errors.OpExecError("Can't list volume groups on the nodes")
3820
    for node in oth_node, tgt_node:
3821
      res = results.get(node, False)
3822
      if not res or my_vg not in res:
3823
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3824
                                 (my_vg, node))
3825
    for dev in instance.disks:
3826
      if not dev.iv_name in self.op.disks:
3827
        continue
3828
      for node in tgt_node, oth_node:
3829
        info("checking %s on %s" % (dev.iv_name, node))
3830
        cfg.SetDiskID(dev, node)
3831
        if not rpc.call_blockdev_find(node, dev):
3832
          raise errors.OpExecError("Can't find device %s on node %s" %
3833
                                   (dev.iv_name, node))
3834

    
3835
    # Step: check other node consistency
3836
    self.proc.LogStep(2, steps_total, "check peer consistency")
3837
    for dev in instance.disks:
3838
      if not dev.iv_name in self.op.disks:
3839
        continue
3840
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3841
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3842
                                   oth_node==instance.primary_node):
3843
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3844
                                 " to replace disks on this node (%s)" %
3845
                                 (oth_node, tgt_node))
3846

    
3847
    # Step: create new storage
3848
    self.proc.LogStep(3, steps_total, "allocate new storage")
3849
    for dev in instance.disks:
3850
      if not dev.iv_name in self.op.disks:
3851
        continue
3852
      size = dev.size
3853
      cfg.SetDiskID(dev, tgt_node)
3854
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3855
      names = _GenerateUniqueNames(cfg, lv_names)
3856
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3857
                             logical_id=(vgname, names[0]))
3858
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3859
                             logical_id=(vgname, names[1]))
3860
      new_lvs = [lv_data, lv_meta]
3861
      old_lvs = dev.children
3862
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3863
      info("creating new local storage on %s for %s" %
3864
           (tgt_node, dev.iv_name))
3865
      # since we *always* want to create this LV, we use the
3866
      # _Create...OnPrimary (which forces the creation), even if we
3867
      # are talking about the secondary node
3868
      for new_lv in new_lvs:
3869
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3870
                                        _GetInstanceInfoText(instance)):
3871
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3872
                                   " node '%s'" %
3873
                                   (new_lv.logical_id[1], tgt_node))
3874

    
3875
    # Step: for each lv, detach+rename*2+attach
3876
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3877
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3878
      info("detaching %s drbd from local storage" % dev.iv_name)
3879
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3880
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3881
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3882
      #dev.children = []
3883
      #cfg.Update(instance)
3884

    
3885
      # ok, we created the new LVs, so now we know we have the needed
3886
      # storage; as such, we proceed on the target node to rename
3887
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3888
      # using the assumption that logical_id == physical_id (which in
3889
      # turn is the unique_id on that node)
3890

    
3891
      # FIXME(iustin): use a better name for the replaced LVs
3892
      temp_suffix = int(time.time())
3893
      ren_fn = lambda d, suff: (d.physical_id[0],
3894
                                d.physical_id[1] + "_replaced-%s" % suff)
3895
      # build the rename list based on what LVs exist on the node
3896
      rlist = []
3897
      for to_ren in old_lvs:
3898
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3899
        if find_res is not None: # device exists
3900
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3901

    
3902
      info("renaming the old LVs on the target node")
3903
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3904
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3905
      # now we rename the new LVs to the old LVs
3906
      info("renaming the new LVs on the target node")
3907
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3908
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3909
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3910

    
3911
      for old, new in zip(old_lvs, new_lvs):
3912
        new.logical_id = old.logical_id
3913
        cfg.SetDiskID(new, tgt_node)
3914

    
3915
      for disk in old_lvs:
3916
        disk.logical_id = ren_fn(disk, temp_suffix)
3917
        cfg.SetDiskID(disk, tgt_node)
3918

    
3919
      # now that the new lvs have the old name, we can add them to the device
3920
      info("adding new mirror component on %s" % tgt_node)
3921
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3922
        for new_lv in new_lvs:
3923
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3924
            warning("Can't rollback device %s", hint="manually cleanup unused"
3925
                    " logical volumes")
3926
        raise errors.OpExecError("Can't add local storage to drbd")
3927

    
3928
      dev.children = new_lvs
3929
      cfg.Update(instance)
3930

    
3931
    # Step: wait for sync
3932

    
3933
    # this can fail as the old devices are degraded and _WaitForSync
3934
    # does a combined result over all disks, so we don't check its
3935
    # return value
3936
    self.proc.LogStep(5, steps_total, "sync devices")
3937
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3938

    
3939
    # so check manually all the devices
3940
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3941
      cfg.SetDiskID(dev, instance.primary_node)
3942
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3943
      if is_degr:
3944
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3945

    
3946
    # Step: remove old storage
3947
    self.proc.LogStep(6, steps_total, "removing old storage")
3948
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3949
      info("remove logical volumes for %s" % name)
3950
      for lv in old_lvs:
3951
        cfg.SetDiskID(lv, tgt_node)
3952
        if not rpc.call_blockdev_remove(tgt_node, lv):
3953
          warning("Can't remove old LV", hint="manually remove unused LVs")
3954
          continue
3955

    
3956
  def _ExecD8Secondary(self, feedback_fn):
3957
    """Replace the secondary node for drbd8.
3958

3959
    The algorithm for replace is quite complicated:
3960
      - for all disks of the instance:
3961
        - create new LVs on the new node with same names
3962
        - shutdown the drbd device on the old secondary
3963
        - disconnect the drbd network on the primary
3964
        - create the drbd device on the new secondary
3965
        - network attach the drbd on the primary, using an artifice:
3966
          the drbd code for Attach() will connect to the network if it
3967
          finds a device which is connected to the good local disks but
3968
          not network enabled
3969
      - wait for sync across all devices
3970
      - remove all disks from the old secondary
3971

3972
    Failures are not very well handled.
3973

3974
    """
3975
    steps_total = 6
3976
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3977
    instance = self.instance
3978
    iv_names = {}
3979
    vgname = self.cfg.GetVGName()
3980
    # start of work
3981
    cfg = self.cfg
3982
    old_node = self.tgt_node
3983
    new_node = self.new_node
3984
    pri_node = instance.primary_node
3985

    
3986
    # Step: check device activation
3987
    self.proc.LogStep(1, steps_total, "check device existence")
3988
    info("checking volume groups")
3989
    my_vg = cfg.GetVGName()
3990
    results = rpc.call_vg_list([pri_node, new_node])
3991
    if not results:
3992
      raise errors.OpExecError("Can't list volume groups on the nodes")
3993
    for node in pri_node, new_node:
3994
      res = results.get(node, False)
3995
      if not res or my_vg not in res:
3996
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3997
                                 (my_vg, node))
3998
    for dev in instance.disks:
3999
      if not dev.iv_name in self.op.disks:
4000
        continue
4001
      info("checking %s on %s" % (dev.iv_name, pri_node))
4002
      cfg.SetDiskID(dev, pri_node)
4003
      if not rpc.call_blockdev_find(pri_node, dev):
4004
        raise errors.OpExecError("Can't find device %s on node %s" %
4005
                                 (dev.iv_name, pri_node))
4006

    
4007
    # Step: check other node consistency
4008
    self.proc.LogStep(2, steps_total, "check peer consistency")
4009
    for dev in instance.disks:
4010
      if not dev.iv_name in self.op.disks:
4011
        continue
4012
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4013
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4014
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4015
                                 " unsafe to replace the secondary" %
4016
                                 pri_node)
4017

    
4018
    # Step: create new storage
4019
    self.proc.LogStep(3, steps_total, "allocate new storage")
4020
    for dev in instance.disks:
4021
      size = dev.size
4022
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4023
      # since we *always* want to create this LV, we use the
4024
      # _Create...OnPrimary (which forces the creation), even if we
4025
      # are talking about the secondary node
4026
      for new_lv in dev.children:
4027
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4028
                                        _GetInstanceInfoText(instance)):
4029
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4030
                                   " node '%s'" %
4031
                                   (new_lv.logical_id[1], new_node))
4032

    
4033

    
4034
    # Step 4: dbrd minors and drbd setups changes
4035
    # after this, we must manually remove the drbd minors on both the
4036
    # error and the success paths
4037
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4038
                                   instance.name)
4039
    logging.debug("Allocated minors %s" % (minors,))
4040
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4041
    for dev, new_minor in zip(instance.disks, minors):
4042
      size = dev.size
4043
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4044
      # create new devices on new_node
4045
      if pri_node == dev.logical_id[0]:
4046
        new_logical_id = (pri_node, new_node,
4047
                          dev.logical_id[2], dev.logical_id[3], new_minor)
4048
      else:
4049
        new_logical_id = (new_node, pri_node,
4050
                          dev.logical_id[2], new_minor, dev.logical_id[4])
4051
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4052
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4053
                    new_logical_id)
4054
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4055
                              logical_id=new_logical_id,
4056
                              children=dev.children)
4057
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4058
                                        new_drbd, False,
4059
                                      _GetInstanceInfoText(instance)):
4060
        self.cfg.ReleaseDRBDMinors(instance.name)
4061
        raise errors.OpExecError("Failed to create new DRBD on"
4062
                                 " node '%s'" % new_node)
4063

    
4064
    for dev in instance.disks:
4065
      # we have new devices, shutdown the drbd on the old secondary
4066
      info("shutting down drbd for %s on old node" % dev.iv_name)
4067
      cfg.SetDiskID(dev, old_node)
4068
      if not rpc.call_blockdev_shutdown(old_node, dev):
4069
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4070
                hint="Please cleanup this device manually as soon as possible")
4071

    
4072
    info("detaching primary drbds from the network (=> standalone)")
4073
    done = 0
4074
    for dev in instance.disks:
4075
      cfg.SetDiskID(dev, pri_node)
4076
      # set the physical (unique in bdev terms) id to None, meaning
4077
      # detach from network
4078
      dev.physical_id = (None, None, None, None, dev.physical_id[4])
4079
      # and 'find' the device, which will 'fix' it to match the
4080
      # standalone state
4081
      if rpc.call_blockdev_find(pri_node, dev):
4082
        done += 1
4083
      else:
4084
        warning("Failed to detach drbd %s from network, unusual case" %
4085
                dev.iv_name)
4086

    
4087
    if not done:
4088
      # no detaches succeeded (very unlikely)
4089
      self.cfg.ReleaseDRBDMinors(instance.name)
4090
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4091

    
4092
    # if we managed to detach at least one, we update all the disks of
4093
    # the instance to point to the new secondary
4094
    info("updating instance configuration")
4095
    for dev, _, new_logical_id in iv_names.itervalues():
4096
      dev.logical_id = new_logical_id
4097
      cfg.SetDiskID(dev, pri_node)
4098
    cfg.Update(instance)
4099
    # we can remove now the temp minors as now the new values are
4100
    # written to the config file (and therefore stable)
4101
    self.cfg.ReleaseDRBDMinors(instance.name)
4102

    
4103
    # and now perform the drbd attach
4104
    info("attaching primary drbds to new secondary (standalone => connected)")
4105
    failures = []
4106
    for dev in instance.disks:
4107
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4108
      # since the attach is smart, it's enough to 'find' the device,
4109
      # it will automatically activate the network, if the physical_id
4110
      # is correct
4111
      cfg.SetDiskID(dev, pri_node)
4112
      logging.debug("Disk to attach: %s", dev)
4113
      if not rpc.call_blockdev_find(pri_node, dev):
4114
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4115
                "please do a gnt-instance info to see the status of disks")
4116

    
4117
    # this can fail as the old devices are degraded and _WaitForSync
4118
    # does a combined result over all disks, so we don't check its
4119
    # return value
4120
    self.proc.LogStep(5, steps_total, "sync devices")
4121
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4122

    
4123
    # so check manually all the devices
4124
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4125
      cfg.SetDiskID(dev, pri_node)
4126
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4127
      if is_degr:
4128
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4129

    
4130
    self.proc.LogStep(6, steps_total, "removing old storage")
4131
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4132
      info("remove logical volumes for %s" % name)
4133
      for lv in old_lvs:
4134
        cfg.SetDiskID(lv, old_node)
4135
        if not rpc.call_blockdev_remove(old_node, lv):
4136
          warning("Can't remove LV on old secondary",
4137
                  hint="Cleanup stale volumes by hand")
4138

    
4139
  def Exec(self, feedback_fn):
4140
    """Execute disk replacement.
4141

4142
    This dispatches the disk replacement to the appropriate handler.
4143

4144
    """
4145
    instance = self.instance
4146

    
4147
    # Activate the instance disks if we're replacing them on a down instance
4148
    if instance.status == "down":
4149
      _StartInstanceDisks(self.cfg, instance, True)
4150

    
4151
    if instance.disk_template == constants.DT_DRBD8:
4152
      if self.op.remote_node is None:
4153
        fn = self._ExecD8DiskOnly
4154
      else:
4155
        fn = self._ExecD8Secondary
4156
    else:
4157
      raise errors.ProgrammerError("Unhandled disk replacement case")
4158

    
4159
    ret = fn(feedback_fn)
4160

    
4161
    # Deactivate the instance disks if we're replacing them on a down instance
4162
    if instance.status == "down":
4163
      _SafeShutdownInstanceDisks(instance, self.cfg)
4164

    
4165
    return ret
4166

    
4167

    
4168
class LUGrowDisk(LogicalUnit):
4169
  """Grow a disk of an instance.
4170

4171
  """
4172
  HPATH = "disk-grow"
4173
  HTYPE = constants.HTYPE_INSTANCE
4174
  _OP_REQP = ["instance_name", "disk", "amount"]
4175
  REQ_BGL = False
4176

    
4177
  def ExpandNames(self):
4178
    self._ExpandAndLockInstance()
4179
    self.needed_locks[locking.LEVEL_NODE] = []
4180
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4181

    
4182
  def DeclareLocks(self, level):
4183
    if level == locking.LEVEL_NODE:
4184
      self._LockInstancesNodes()
4185

    
4186
  def BuildHooksEnv(self):
4187
    """Build hooks env.
4188

4189
    This runs on the master, the primary and all the secondaries.
4190

4191
    """
4192
    env = {
4193
      "DISK": self.op.disk,
4194
      "AMOUNT": self.op.amount,
4195
      }
4196
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4197
    nl = [
4198
      self.sstore.GetMasterNode(),
4199
      self.instance.primary_node,
4200
      ]
4201
    return env, nl, nl
4202

    
4203
  def CheckPrereq(self):
4204
    """Check prerequisites.
4205

4206
    This checks that the instance is in the cluster.
4207

4208
    """
4209
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4210
    assert instance is not None, \
4211
      "Cannot retrieve locked instance %s" % self.op.instance_name
4212

    
4213
    self.instance = instance
4214

    
4215
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4216
      raise errors.OpPrereqError("Instance's disk layout does not support"
4217
                                 " growing.")
4218

    
4219
    if instance.FindDisk(self.op.disk) is None:
4220
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4221
                                 (self.op.disk, instance.name))
4222

    
4223
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4224
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4225
    for node in nodenames:
4226
      info = nodeinfo.get(node, None)
4227
      if not info:
4228
        raise errors.OpPrereqError("Cannot get current information"
4229
                                   " from node '%s'" % node)
4230
      vg_free = info.get('vg_free', None)
4231
      if not isinstance(vg_free, int):
4232
        raise errors.OpPrereqError("Can't compute free disk space on"
4233
                                   " node %s" % node)
4234
      if self.op.amount > info['vg_free']:
4235
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4236
                                   " %d MiB available, %d MiB required" %
4237
                                   (node, info['vg_free'], self.op.amount))
4238

    
4239
  def Exec(self, feedback_fn):
4240
    """Execute disk grow.
4241

4242
    """
4243
    instance = self.instance
4244
    disk = instance.FindDisk(self.op.disk)
4245
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4246
      self.cfg.SetDiskID(disk, node)
4247
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4248
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4249
        raise errors.OpExecError("grow request failed to node %s" % node)
4250
      elif not result[0]:
4251
        raise errors.OpExecError("grow request failed to node %s: %s" %
4252
                                 (node, result[1]))
4253
    disk.RecordGrow(self.op.amount)
4254
    self.cfg.Update(instance)
4255
    return
4256

    
4257

    
4258
class LUQueryInstanceData(NoHooksLU):
4259
  """Query runtime instance data.
4260

4261
  """
4262
  _OP_REQP = ["instances"]
4263
  REQ_BGL = False
4264
  def ExpandNames(self):
4265
    self.needed_locks = {}
4266
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4267

    
4268
    if not isinstance(self.op.instances, list):
4269
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4270

    
4271
    if self.op.instances:
4272
      self.wanted_names = []
4273
      for name in self.op.instances:
4274
        full_name = self.cfg.ExpandInstanceName(name)
4275
        if full_name is None:
4276
          raise errors.OpPrereqError("Instance '%s' not known" %
4277
                                     self.op.instance_name)
4278
        self.wanted_names.append(full_name)
4279
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4280
    else:
4281
      self.wanted_names = None
4282
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4283

    
4284
    self.needed_locks[locking.LEVEL_NODE] = []
4285
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4286

    
4287
  def DeclareLocks(self, level):
4288
    if level == locking.LEVEL_NODE:
4289
      self._LockInstancesNodes()
4290

    
4291
  def CheckPrereq(self):
4292
    """Check prerequisites.
4293

4294
    This only checks the optional instance list against the existing names.
4295

4296
    """
4297
    if self.wanted_names is None:
4298
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4299

    
4300
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4301
                             in self.wanted_names]
4302
    return
4303

    
4304
  def _ComputeDiskStatus(self, instance, snode, dev):
4305
    """Compute block device status.
4306

4307
    """
4308
    self.cfg.SetDiskID(dev, instance.primary_node)
4309
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4310
    if dev.dev_type in constants.LDS_DRBD:
4311
      # we change the snode then (otherwise we use the one passed in)
4312
      if dev.logical_id[0] == instance.primary_node:
4313
        snode = dev.logical_id[1]
4314
      else:
4315
        snode = dev.logical_id[0]
4316

    
4317
    if snode:
4318
      self.cfg.SetDiskID(dev, snode)
4319
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4320
    else:
4321
      dev_sstatus = None
4322

    
4323
    if dev.children:
4324
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4325
                      for child in dev.children]
4326
    else:
4327
      dev_children = []
4328

    
4329
    data = {
4330
      "iv_name": dev.iv_name,
4331
      "dev_type": dev.dev_type,
4332
      "logical_id": dev.logical_id,
4333
      "physical_id": dev.physical_id,
4334
      "pstatus": dev_pstatus,
4335
      "sstatus": dev_sstatus,
4336
      "children": dev_children,
4337
      }
4338

    
4339
    return data
4340

    
4341
  def Exec(self, feedback_fn):
4342
    """Gather and return data"""
4343
    result = {}
4344
    for instance in self.wanted_instances:
4345
      remote_info = rpc.call_instance_info(instance.primary_node,
4346
                                                instance.name)
4347
      if remote_info and "state" in remote_info:
4348
        remote_state = "up"
4349
      else:
4350
        remote_state = "down"
4351
      if instance.status == "down":
4352
        config_state = "down"
4353
      else:
4354
        config_state = "up"
4355

    
4356
      disks = [self._ComputeDiskStatus(instance, None, device)
4357
               for device in instance.disks]
4358

    
4359
      idict = {
4360
        "name": instance.name,
4361
        "config_state": config_state,
4362
        "run_state": remote_state,
4363
        "pnode": instance.primary_node,
4364
        "snodes": instance.secondary_nodes,
4365
        "os": instance.os,
4366
        "memory": instance.memory,
4367
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4368
        "disks": disks,
4369
        "vcpus": instance.vcpus,
4370
        }
4371

    
4372
      htkind = self.sstore.GetHypervisorType()
4373
      if htkind == constants.HT_XEN_PVM30:
4374
        idict["kernel_path"] = instance.kernel_path
4375
        idict["initrd_path"] = instance.initrd_path
4376

    
4377
      if htkind == constants.HT_XEN_HVM31:
4378
        idict["hvm_boot_order"] = instance.hvm_boot_order
4379
        idict["hvm_acpi"] = instance.hvm_acpi
4380
        idict["hvm_pae"] = instance.hvm_pae
4381
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4382
        idict["hvm_nic_type"] = instance.hvm_nic_type
4383
        idict["hvm_disk_type"] = instance.hvm_disk_type
4384

    
4385
      if htkind in constants.HTS_REQ_PORT:
4386
        if instance.vnc_bind_address is None:
4387
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4388
        else:
4389
          vnc_bind_address = instance.vnc_bind_address
4390
        if instance.network_port is None:
4391
          vnc_console_port = None
4392
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4393
          vnc_console_port = "%s:%s" % (instance.primary_node,
4394
                                       instance.network_port)
4395
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4396
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4397
                                                   instance.network_port,
4398
                                                   instance.primary_node)
4399
        else:
4400
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4401
                                        instance.network_port)
4402
        idict["vnc_console_port"] = vnc_console_port
4403
        idict["vnc_bind_address"] = vnc_bind_address
4404
        idict["network_port"] = instance.network_port
4405

    
4406
      result[instance.name] = idict
4407

    
4408
    return result
4409

    
4410

    
4411
class LUSetInstanceParams(LogicalUnit):
4412
  """Modifies an instances's parameters.
4413

4414
  """
4415
  HPATH = "instance-modify"
4416
  HTYPE = constants.HTYPE_INSTANCE
4417
  _OP_REQP = ["instance_name"]
4418
  REQ_BGL = False
4419

    
4420
  def ExpandNames(self):
4421
    self._ExpandAndLockInstance()
4422

    
4423
  def BuildHooksEnv(self):
4424
    """Build hooks env.
4425

4426
    This runs on the master, primary and secondaries.
4427

4428
    """
4429
    args = dict()
4430
    if self.mem:
4431
      args['memory'] = self.mem
4432
    if self.vcpus:
4433
      args['vcpus'] = self.vcpus
4434
    if self.do_ip or self.do_bridge or self.mac:
4435
      if self.do_ip:
4436
        ip = self.ip
4437
      else:
4438
        ip = self.instance.nics[0].ip
4439
      if self.bridge:
4440
        bridge = self.bridge
4441
      else:
4442
        bridge = self.instance.nics[0].bridge
4443
      if self.mac:
4444
        mac = self.mac
4445
      else:
4446
        mac = self.instance.nics[0].mac
4447
      args['nics'] = [(ip, bridge, mac)]
4448
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4449
    nl = [self.sstore.GetMasterNode(),
4450
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4451
    return env, nl, nl
4452

    
4453
  def CheckPrereq(self):
4454
    """Check prerequisites.
4455

4456
    This only checks the instance list against the existing names.
4457

4458
    """
4459
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4460
    # a separate CheckArguments function, if we implement one, so the operation
4461
    # can be aborted without waiting for any lock, should it have an error...
4462
    self.mem = getattr(self.op, "mem", None)
4463
    self.vcpus = getattr(self.op, "vcpus", None)
4464
    self.ip = getattr(self.op, "ip", None)
4465
    self.mac = getattr(self.op, "mac", None)
4466
    self.bridge = getattr(self.op, "bridge", None)
4467
    self.kernel_path = getattr(self.op, "kernel_path", None)
4468
    self.initrd_path = getattr(self.op, "initrd_path", None)
4469
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4470
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4471
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4472
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4473
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4474
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4475
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4476
    self.force = getattr(self.op, "force", None)
4477
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4478
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4479
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4480
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4481
    if all_parms.count(None) == len(all_parms):
4482
      raise errors.OpPrereqError("No changes submitted")
4483
    if self.mem is not None:
4484
      try:
4485
        self.mem = int(self.mem)
4486
      except ValueError, err:
4487
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4488
    if self.vcpus is not None:
4489
      try:
4490
        self.vcpus = int(self.vcpus)
4491
      except ValueError, err:
4492
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4493
    if self.ip is not None:
4494
      self.do_ip = True
4495
      if self.ip.lower() == "none":
4496
        self.ip = None
4497
      else:
4498
        if not utils.IsValidIP(self.ip):
4499
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4500
    else:
4501
      self.do_ip = False
4502
    self.do_bridge = (self.bridge is not None)
4503
    if self.mac is not None:
4504
      if self.cfg.IsMacInUse(self.mac):
4505
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4506
                                   self.mac)
4507
      if not utils.IsValidMac(self.mac):
4508
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4509

    
4510
    if self.kernel_path is not None:
4511
      self.do_kernel_path = True
4512
      if self.kernel_path == constants.VALUE_NONE:
4513
        raise errors.OpPrereqError("Can't set instance to no kernel")
4514

    
4515
      if self.kernel_path != constants.VALUE_DEFAULT:
4516
        if not os.path.isabs(self.kernel_path):
4517
          raise errors.OpPrereqError("The kernel path must be an absolute"
4518
                                    " filename")
4519
    else:
4520
      self.do_kernel_path = False
4521

    
4522
    if self.initrd_path is not None:
4523
      self.do_initrd_path = True
4524
      if self.initrd_path not in (constants.VALUE_NONE,
4525
                                  constants.VALUE_DEFAULT):
4526
        if not os.path.isabs(self.initrd_path):
4527
          raise errors.OpPrereqError("The initrd path must be an absolute"
4528
                                    " filename")
4529
    else:
4530
      self.do_initrd_path = False
4531

    
4532
    # boot order verification
4533
    if self.hvm_boot_order is not None:
4534
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4535
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4536
          raise errors.OpPrereqError("invalid boot order specified,"
4537
                                     " must be one or more of [acdn]"
4538
                                     " or 'default'")
4539

    
4540
    # hvm_cdrom_image_path verification
4541
    if self.op.hvm_cdrom_image_path is not None:
4542
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4543
              self.op.hvm_cdrom_image_path.lower() == "none"):
4544
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4545
                                   " be an absolute path or None, not %s" %
4546
                                   self.op.hvm_cdrom_image_path)
4547
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4548
              self.op.hvm_cdrom_image_path.lower() == "none"):
4549
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4550
                                   " regular file or a symlink pointing to"
4551
                                   " an existing regular file, not %s" %
4552
                                   self.op.hvm_cdrom_image_path)
4553

    
4554
    # vnc_bind_address verification
4555
    if self.op.vnc_bind_address is not None:
4556
      if not utils.IsValidIP(self.op.vnc_bind_address):
4557
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4558
                                   " like a valid IP address" %
4559
                                   self.op.vnc_bind_address)
4560

    
4561
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4562
    assert self.instance is not None, \
4563
      "Cannot retrieve locked instance %s" % self.op.instance_name
4564
    self.warn = []
4565
    if self.mem is not None and not self.force:
4566
      pnode = self.instance.primary_node
4567
      nodelist = [pnode]
4568
      nodelist.extend(instance.secondary_nodes)
4569
      instance_info = rpc.call_instance_info(pnode, instance.name)
4570
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4571

    
4572
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4573
        # Assume the primary node is unreachable and go ahead
4574
        self.warn.append("Can't get info from primary node %s" % pnode)
4575
      else:
4576
        if instance_info:
4577
          current_mem = instance_info['memory']
4578
        else:
4579
          # Assume instance not running
4580
          # (there is a slight race condition here, but it's not very probable,
4581
          # and we have no other way to check)
4582
          current_mem = 0
4583
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4584
        if miss_mem > 0:
4585
          raise errors.OpPrereqError("This change will prevent the instance"
4586
                                     " from starting, due to %d MB of memory"
4587
                                     " missing on its primary node" % miss_mem)
4588

    
4589
      for node in instance.secondary_nodes:
4590
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4591
          self.warn.append("Can't get info from secondary node %s" % node)
4592
        elif self.mem > nodeinfo[node]['memory_free']:
4593
          self.warn.append("Not enough memory to failover instance to secondary"
4594
                           " node %s" % node)
4595

    
4596
    # Xen HVM device type checks
4597
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4598
      if self.op.hvm_nic_type is not None:
4599
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4600
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4601
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4602
      if self.op.hvm_disk_type is not None:
4603
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4604
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4605
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4606

    
4607
    return
4608

    
4609
  def Exec(self, feedback_fn):
4610
    """Modifies an instance.
4611

4612
    All parameters take effect only at the next restart of the instance.
4613
    """
4614
    # Process here the warnings from CheckPrereq, as we don't have a
4615
    # feedback_fn there.
4616
    for warn in self.warn:
4617
      feedback_fn("WARNING: %s" % warn)
4618

    
4619
    result = []
4620
    instance = self.instance
4621
    if self.mem:
4622
      instance.memory = self.mem
4623
      result.append(("mem", self.mem))
4624
    if self.vcpus:
4625
      instance.vcpus = self.vcpus
4626
      result.append(("vcpus",  self.vcpus))
4627
    if self.do_ip:
4628
      instance.nics[0].ip = self.ip
4629
      result.append(("ip", self.ip))
4630
    if self.bridge:
4631
      instance.nics[0].bridge = self.bridge
4632
      result.append(("bridge", self.bridge))
4633
    if self.mac:
4634
      instance.nics[0].mac = self.mac
4635
      result.append(("mac", self.mac))
4636
    if self.do_kernel_path:
4637
      instance.kernel_path = self.kernel_path
4638
      result.append(("kernel_path", self.kernel_path))
4639
    if self.do_initrd_path:
4640
      instance.initrd_path = self.initrd_path
4641
      result.append(("initrd_path", self.initrd_path))
4642
    if self.hvm_boot_order:
4643
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4644
        instance.hvm_boot_order = None
4645
      else:
4646
        instance.hvm_boot_order = self.hvm_boot_order
4647
      result.append(("hvm_boot_order", self.hvm_boot_order))
4648
    if self.hvm_acpi is not None:
4649
      instance.hvm_acpi = self.hvm_acpi
4650
      result.append(("hvm_acpi", self.hvm_acpi))
4651
    if self.hvm_pae is not None:
4652
      instance.hvm_pae = self.hvm_pae
4653
      result.append(("hvm_pae", self.hvm_pae))
4654
    if self.hvm_nic_type is not None:
4655
      instance.hvm_nic_type = self.hvm_nic_type
4656
      result.append(("hvm_nic_type", self.hvm_nic_type))
4657
    if self.hvm_disk_type is not None:
4658
      instance.hvm_disk_type = self.hvm_disk_type
4659
      result.append(("hvm_disk_type", self.hvm_disk_type))
4660
    if self.hvm_cdrom_image_path:
4661
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4662
        instance.hvm_cdrom_image_path = None
4663
      else:
4664
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4665
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4666
    if self.vnc_bind_address:
4667
      instance.vnc_bind_address = self.vnc_bind_address
4668
      result.append(("vnc_bind_address", self.vnc_bind_address))
4669

    
4670
    self.cfg.Update(instance)
4671

    
4672
    return result
4673

    
4674

    
4675
class LUQueryExports(NoHooksLU):
4676
  """Query the exports list
4677

4678
  """
4679
  _OP_REQP = ['nodes']
4680
  REQ_BGL = False
4681

    
4682
  def ExpandNames(self):
4683
    self.needed_locks = {}
4684
    self.share_locks[locking.LEVEL_NODE] = 1
4685
    if not self.op.nodes:
4686
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4687
    else:
4688
      self.needed_locks[locking.LEVEL_NODE] = \
4689
        _GetWantedNodes(self, self.op.nodes)
4690

    
4691
  def CheckPrereq(self):
4692
    """Check prerequisites.
4693

4694
    """
4695
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4696

    
4697
  def Exec(self, feedback_fn):
4698
    """Compute the list of all the exported system images.
4699

4700
    Returns:
4701
      a dictionary with the structure node->(export-list)
4702
      where export-list is a list of the instances exported on
4703
      that node.
4704

4705
    """
4706
    return rpc.call_export_list(self.nodes)
4707

    
4708

    
4709
class LUExportInstance(LogicalUnit):
4710
  """Export an instance to an image in the cluster.
4711

4712
  """
4713
  HPATH = "instance-export"
4714
  HTYPE = constants.HTYPE_INSTANCE
4715
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4716
  REQ_BGL = False
4717

    
4718
  def ExpandNames(self):
4719
    self._ExpandAndLockInstance()
4720
    # FIXME: lock only instance primary and destination node
4721
    #
4722
    # Sad but true, for now we have do lock all nodes, as we don't know where
4723
    # the previous export might be, and and in this LU we search for it and
4724
    # remove it from its current node. In the future we could fix this by:
4725
    #  - making a tasklet to search (share-lock all), then create the new one,
4726
    #    then one to remove, after
4727
    #  - removing the removal operation altoghether
4728
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4729

    
4730
  def DeclareLocks(self, level):
4731
    """Last minute lock declaration."""
4732
    # All nodes are locked anyway, so nothing to do here.
4733

    
4734
  def BuildHooksEnv(self):
4735
    """Build hooks env.
4736

4737
    This will run on the master, primary node and target node.
4738

4739
    """
4740
    env = {
4741
      "EXPORT_NODE": self.op.target_node,
4742
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4743
      }
4744
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4745
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4746
          self.op.target_node]
4747
    return env, nl, nl
4748

    
4749
  def CheckPrereq(self):
4750
    """Check prerequisites.
4751

4752
    This checks that the instance and node names are valid.
4753

4754
    """
4755
    instance_name = self.op.instance_name
4756
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4757
    assert self.instance is not None, \
4758
          "Cannot retrieve locked instance %s" % self.op.instance_name
4759

    
4760
    self.dst_node = self.cfg.GetNodeInfo(
4761
      self.cfg.ExpandNodeName(self.op.target_node))
4762

    
4763
    assert self.dst_node is not None, \
4764
          "Cannot retrieve locked node %s" % self.op.target_node
4765

    
4766
    # instance disk type verification
4767
    for disk in self.instance.disks:
4768
      if disk.dev_type == constants.LD_FILE:
4769
        raise errors.OpPrereqError("Export not supported for instances with"
4770
                                   " file-based disks")
4771

    
4772
  def Exec(self, feedback_fn):
4773
    """Export an instance to an image in the cluster.
4774

4775
    """
4776
    instance = self.instance
4777
    dst_node = self.dst_node
4778
    src_node = instance.primary_node
4779
    if self.op.shutdown:
4780
      # shutdown the instance, but not the disks
4781
      if not rpc.call_instance_shutdown(src_node, instance):
4782
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4783
                                 (instance.name, src_node))
4784

    
4785
    vgname = self.cfg.GetVGName()
4786

    
4787
    snap_disks = []
4788

    
4789
    try:
4790
      for disk in instance.disks:
4791
        if disk.iv_name == "sda":
4792
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4793
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4794

    
4795
          if not new_dev_name:
4796
            logger.Error("could not snapshot block device %s on node %s" %
4797
                         (disk.logical_id[1], src_node))
4798
          else:
4799
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4800
                                      logical_id=(vgname, new_dev_name),
4801
                                      physical_id=(vgname, new_dev_name),
4802
                                      iv_name=disk.iv_name)
4803
            snap_disks.append(new_dev)
4804

    
4805
    finally:
4806
      if self.op.shutdown and instance.status == "up":
4807
        if not rpc.call_instance_start(src_node, instance, None):
4808
          _ShutdownInstanceDisks(instance, self.cfg)
4809
          raise errors.OpExecError("Could not start instance")
4810

    
4811
    # TODO: check for size
4812

    
4813
    for dev in snap_disks:
4814
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4815
        logger.Error("could not export block device %s from node %s to node %s"
4816
                     % (dev.logical_id[1], src_node, dst_node.name))
4817
      if not rpc.call_blockdev_remove(src_node, dev):
4818
        logger.Error("could not remove snapshot block device %s from node %s" %
4819
                     (dev.logical_id[1], src_node))
4820

    
4821
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4822
      logger.Error("could not finalize export for instance %s on node %s" %
4823
                   (instance.name, dst_node.name))
4824

    
4825
    nodelist = self.cfg.GetNodeList()
4826
    nodelist.remove(dst_node.name)
4827

    
4828
    # on one-node clusters nodelist will be empty after the removal
4829
    # if we proceed the backup would be removed because OpQueryExports
4830
    # substitutes an empty list with the full cluster node list.
4831
    if nodelist:
4832
      exportlist = rpc.call_export_list(nodelist)
4833
      for node in exportlist:
4834
        if instance.name in exportlist[node]:
4835
          if not rpc.call_export_remove(node, instance.name):
4836
            logger.Error("could not remove older export for instance %s"
4837
                         " on node %s" % (instance.name, node))
4838

    
4839

    
4840
class LURemoveExport(NoHooksLU):
4841
  """Remove exports related to the named instance.
4842

4843
  """
4844
  _OP_REQP = ["instance_name"]
4845
  REQ_BGL = False
4846

    
4847
  def ExpandNames(self):
4848
    self.needed_locks = {}
4849
    # We need all nodes to be locked in order for RemoveExport to work, but we
4850
    # don't need to lock the instance itself, as nothing will happen to it (and
4851
    # we can remove exports also for a removed instance)
4852
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4853

    
4854
  def CheckPrereq(self):
4855
    """Check prerequisites.
4856
    """
4857
    pass
4858

    
4859
  def Exec(self, feedback_fn):
4860
    """Remove any export.
4861

4862
    """
4863
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4864
    # If the instance was not found we'll try with the name that was passed in.
4865
    # This will only work if it was an FQDN, though.
4866
    fqdn_warn = False
4867
    if not instance_name:
4868
      fqdn_warn = True
4869
      instance_name = self.op.instance_name
4870

    
4871
    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4872
    found = False
4873
    for node in exportlist:
4874
      if instance_name in exportlist[node]:
4875
        found = True
4876
        if not rpc.call_export_remove(node, instance_name):
4877
          logger.Error("could not remove export for instance %s"
4878
                       " on node %s" % (instance_name, node))
4879

    
4880
    if fqdn_warn and not found:
4881
      feedback_fn("Export not found. If trying to remove an export belonging"
4882
                  " to a deleted instance please use its Fully Qualified"
4883
                  " Domain Name.")
4884

    
4885

    
4886
class TagsLU(NoHooksLU):
4887
  """Generic tags LU.
4888

4889
  This is an abstract class which is the parent of all the other tags LUs.
4890

4891
  """
4892

    
4893
  def ExpandNames(self):
4894
    self.needed_locks = {}
4895
    if self.op.kind == constants.TAG_NODE:
4896
      name = self.cfg.ExpandNodeName(self.op.name)
4897
      if name is None:
4898
        raise errors.OpPrereqError("Invalid node name (%s)" %
4899
                                   (self.op.name,))
4900
      self.op.name = name
4901
      self.needed_locks[locking.LEVEL_NODE] = name
4902
    elif self.op.kind == constants.TAG_INSTANCE:
4903
      name = self.cfg.ExpandInstanceName(self.op.name)
4904
      if name is None:
4905
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4906
                                   (self.op.name,))
4907
      self.op.name = name
4908
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4909

    
4910
  def CheckPrereq(self):
4911
    """Check prerequisites.
4912

4913
    """
4914
    if self.op.kind == constants.TAG_CLUSTER:
4915
      self.target = self.cfg.GetClusterInfo()
4916
    elif self.op.kind == constants.TAG_NODE:
4917
      self.target = self.cfg.GetNodeInfo(self.op.name)
4918
    elif self.op.kind == constants.TAG_INSTANCE:
4919
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4920
    else:
4921
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4922
                                 str(self.op.kind))
4923

    
4924

    
4925
class LUGetTags(TagsLU):
4926
  """Returns the tags of a given object.
4927

4928
  """
4929
  _OP_REQP = ["kind", "name"]
4930
  REQ_BGL = False
4931

    
4932
  def Exec(self, feedback_fn):
4933
    """Returns the tag list.
4934

4935
    """
4936
    return list(self.target.GetTags())
4937

    
4938

    
4939
class LUSearchTags(NoHooksLU):
4940
  """Searches the tags for a given pattern.
4941

4942
  """
4943
  _OP_REQP = ["pattern"]
4944
  REQ_BGL = False
4945

    
4946
  def ExpandNames(self):
4947
    self.needed_locks = {}
4948

    
4949
  def CheckPrereq(self):
4950
    """Check prerequisites.
4951

4952
    This checks the pattern passed for validity by compiling it.
4953

4954
    """
4955
    try:
4956
      self.re = re.compile(self.op.pattern)
4957
    except re.error, err:
4958
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4959
                                 (self.op.pattern, err))
4960

    
4961
  def Exec(self, feedback_fn):
4962
    """Returns the tag list.
4963

4964
    """
4965
    cfg = self.cfg
4966
    tgts = [("/cluster", cfg.GetClusterInfo())]
4967
    ilist = cfg.GetAllInstancesInfo().values()
4968
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4969
    nlist = cfg.GetAllNodesInfo().values()
4970
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4971
    results = []
4972
    for path, target in tgts:
4973
      for tag in target.GetTags():
4974
        if self.re.search(tag):
4975
          results.append((path, tag))
4976
    return results
4977

    
4978

    
4979
class LUAddTags(TagsLU):
4980
  """Sets a tag on a given object.
4981

4982
  """
4983
  _OP_REQP = ["kind", "name", "tags"]
4984
  REQ_BGL = False
4985

    
4986
  def CheckPrereq(self):
4987
    """Check prerequisites.
4988

4989
    This checks the type and length of the tag name and value.
4990

4991
    """
4992
    TagsLU.CheckPrereq(self)
4993
    for tag in self.op.tags:
4994
      objects.TaggableObject.ValidateTag(tag)
4995

    
4996
  def Exec(self, feedback_fn):
4997
    """Sets the tag.
4998

4999
    """
5000
    try:
5001
      for tag in self.op.tags:
5002
        self.target.AddTag(tag)
5003
    except errors.TagError, err:
5004
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
5005
    try:
5006
      self.cfg.Update(self.target)
5007
    except errors.ConfigurationError:
5008
      raise errors.OpRetryError("There has been a modification to the"
5009
                                " config file and the operation has been"
5010
                                " aborted. Please retry.")
5011

    
5012

    
5013
class LUDelTags(TagsLU):
5014
  """Delete a list of tags from a given object.
5015

5016
  """
5017
  _OP_REQP = ["kind", "name", "tags"]
5018
  REQ_BGL = False
5019

    
5020
  def CheckPrereq(self):
5021
    """Check prerequisites.
5022

5023
    This checks that we have the given tag.
5024

5025
    """
5026
    TagsLU.CheckPrereq(self)
5027
    for tag in self.op.tags:
5028
      objects.TaggableObject.ValidateTag(tag)
5029
    del_tags = frozenset(self.op.tags)
5030
    cur_tags = self.target.GetTags()
5031
    if not del_tags <= cur_tags:
5032
      diff_tags = del_tags - cur_tags
5033
      diff_names = ["'%s'" % tag for tag in diff_tags]
5034
      diff_names.sort()
5035
      raise errors.OpPrereqError("Tag(s) %s not found" %
5036
                                 (",".join(diff_names)))
5037

    
5038
  def Exec(self, feedback_fn):
5039
    """Remove the tag from the object.
5040

5041
    """
5042
    for tag in self.op.tags:
5043
      self.target.RemoveTag(tag)
5044
    try:
5045
      self.cfg.Update(self.target)
5046
    except errors.ConfigurationError:
5047
      raise errors.OpRetryError("There has been a modification to the"
5048
                                " config file and the operation has been"
5049
                                " aborted. Please retry.")
5050

    
5051

    
5052
class LUTestDelay(NoHooksLU):
5053
  """Sleep for a specified amount of time.
5054

5055
  This LU sleeps on the master and/or nodes for a specified amount of
5056
  time.
5057

5058
  """
5059
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5060
  REQ_BGL = False
5061

    
5062
  def ExpandNames(self):
5063
    """Expand names and set required locks.
5064

5065
    This expands the node list, if any.
5066

5067
    """
5068
    self.needed_locks = {}
5069
    if self.op.on_nodes:
5070
      # _GetWantedNodes can be used here, but is not always appropriate to use
5071
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5072
      # more information.
5073
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5074
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5075

    
5076
  def CheckPrereq(self):
5077
    """Check prerequisites.
5078

5079
    """
5080

    
5081
  def Exec(self, feedback_fn):
5082
    """Do the actual sleep.
5083

5084
    """
5085
    if self.op.on_master:
5086
      if not utils.TestDelay(self.op.duration):
5087
        raise errors.OpExecError("Error during master delay test")
5088
    if self.op.on_nodes:
5089
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5090
      if not result:
5091
        raise errors.OpExecError("Complete failure from rpc call")
5092
      for node, node_result in result.items():
5093
        if not node_result:
5094
          raise errors.OpExecError("Failure during rpc call to node %s,"
5095
                                   " result: %s" % (node, node_result))
5096

    
5097

    
5098
class IAllocator(object):
5099
  """IAllocator framework.
5100

5101
  An IAllocator instance has three sets of attributes:
5102
    - cfg/sstore that are needed to query the cluster
5103
    - input data (all members of the _KEYS class attribute are required)
5104
    - four buffer attributes (in|out_data|text), that represent the
5105
      input (to the external script) in text and data structure format,
5106
      and the output from it, again in two formats
5107
    - the result variables from the script (success, info, nodes) for
5108
      easy usage
5109

5110
  """
5111
  _ALLO_KEYS = [
5112
    "mem_size", "disks", "disk_template",
5113
    "os", "tags", "nics", "vcpus",
5114
    ]
5115
  _RELO_KEYS = [
5116
    "relocate_from",
5117
    ]
5118

    
5119
  def __init__(self, cfg, sstore, mode, name, **kwargs):
5120
    self.cfg = cfg
5121
    self.sstore = sstore
5122
    # init buffer variables
5123
    self.in_text = self.out_text = self.in_data = self.out_data = None
5124
    # init all input fields so that pylint is happy
5125
    self.mode = mode
5126
    self.name = name
5127
    self.mem_size = self.disks = self.disk_template = None
5128
    self.os = self.tags = self.nics = self.vcpus = None
5129
    self.relocate_from = None
5130
    # computed fields
5131
    self.required_nodes = None
5132
    # init result fields
5133
    self.success = self.info = self.nodes = None
5134
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5135
      keyset = self._ALLO_KEYS
5136
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5137
      keyset = self._RELO_KEYS
5138
    else:
5139
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5140
                                   " IAllocator" % self.mode)
5141
    for key in kwargs:
5142
      if key not in keyset:
5143
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5144
                                     " IAllocator" % key)
5145
      setattr(self, key, kwargs[key])
5146
    for key in keyset:
5147
      if key not in kwargs:
5148
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5149
                                     " IAllocator" % key)
5150
    self._BuildInputData()
5151

    
5152
  def _ComputeClusterData(self):
5153
    """Compute the generic allocator input data.
5154

5155
    This is the data that is independent of the actual operation.
5156

5157
    """
5158
    cfg = self.cfg
5159
    # cluster data
5160
    data = {
5161
      "version": 1,
5162
      "cluster_name": self.sstore.GetClusterName(),
5163
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5164
      "hypervisor_type": self.sstore.GetHypervisorType(),
5165
      # we don't have job IDs
5166
      }
5167

    
5168
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5169

    
5170
    # node data
5171
    node_results = {}
5172
    node_list = cfg.GetNodeList()
5173
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5174
    for nname in node_list:
5175
      ninfo = cfg.GetNodeInfo(nname)
5176
      if nname not in node_data or not isinstance(node_data[nname], dict):
5177
        raise errors.OpExecError("Can't get data for node %s" % nname)
5178
      remote_info = node_data[nname]
5179
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5180
                   'vg_size', 'vg_free', 'cpu_total']:
5181
        if attr not in remote_info:
5182
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5183
                                   (nname, attr))
5184
        try:
5185
          remote_info[attr] = int(remote_info[attr])
5186
        except ValueError, err:
5187
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5188
                                   " %s" % (nname, attr, str(err)))
5189
      # compute memory used by primary instances
5190
      i_p_mem = i_p_up_mem = 0
5191
      for iinfo in i_list:
5192
        if iinfo.primary_node == nname:
5193
          i_p_mem += iinfo.memory
5194
          if iinfo.status == "up":
5195
            i_p_up_mem += iinfo.memory
5196

    
5197
      # compute memory used by instances
5198
      pnr = {
5199
        "tags": list(ninfo.GetTags()),
5200
        "total_memory": remote_info['memory_total'],
5201
        "reserved_memory": remote_info['memory_dom0'],
5202
        "free_memory": remote_info['memory_free'],
5203
        "i_pri_memory": i_p_mem,
5204
        "i_pri_up_memory": i_p_up_mem,
5205
        "total_disk": remote_info['vg_size'],
5206
        "free_disk": remote_info['vg_free'],
5207
        "primary_ip": ninfo.primary_ip,
5208
        "secondary_ip": ninfo.secondary_ip,
5209
        "total_cpus": remote_info['cpu_total'],
5210
        }
5211
      node_results[nname] = pnr
5212
    data["nodes"] = node_results
5213

    
5214
    # instance data
5215
    instance_data = {}
5216
    for iinfo in i_list:
5217
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5218
                  for n in iinfo.nics]
5219
      pir = {
5220
        "tags": list(iinfo.GetTags()),
5221
        "should_run": iinfo.status == "up",
5222
        "vcpus": iinfo.vcpus,
5223
        "memory": iinfo.memory,
5224
        "os": iinfo.os,
5225
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5226
        "nics": nic_data,
5227
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5228
        "disk_template": iinfo.disk_template,
5229
        }
5230
      instance_data[iinfo.name] = pir
5231

    
5232
    data["instances"] = instance_data
5233

    
5234
    self.in_data = data
5235

    
5236
  def _AddNewInstance(self):
5237
    """Add new instance data to allocator structure.
5238

5239
    This in combination with _AllocatorGetClusterData will create the
5240
    correct structure needed as input for the allocator.
5241

5242
    The checks for the completeness of the opcode must have already been
5243
    done.
5244

5245
    """
5246
    data = self.in_data
5247
    if len(self.disks) != 2:
5248
      raise errors.OpExecError("Only two-disk configurations supported")
5249

    
5250
    disk_space = _ComputeDiskSize(self.disk_template,
5251
                                  self.disks[0]["size"], self.disks[1]["size"])
5252

    
5253
    if self.disk_template in constants.DTS_NET_MIRROR:
5254
      self.required_nodes = 2
5255
    else:
5256
      self.required_nodes = 1
5257
    request = {
5258
      "type": "allocate",
5259
      "name": self.name,
5260
      "disk_template": self.disk_template,
5261
      "tags": self.tags,
5262
      "os": self.os,
5263
      "vcpus": self.vcpus,
5264
      "memory": self.mem_size,
5265
      "disks": self.disks,
5266
      "disk_space_total": disk_space,
5267
      "nics": self.nics,
5268
      "required_nodes": self.required_nodes,
5269
      }
5270
    data["request"] = request
5271

    
5272
  def _AddRelocateInstance(self):
5273
    """Add relocate instance data to allocator structure.
5274

5275
    This in combination with _IAllocatorGetClusterData will create the
5276
    correct structure needed as input for the allocator.
5277

5278
    The checks for the completeness of the opcode must have already been
5279
    done.
5280

5281
    """
5282
    instance = self.cfg.GetInstanceInfo(self.name)
5283
    if instance is None:
5284
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5285
                                   " IAllocator" % self.name)
5286

    
5287
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5288
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5289

    
5290
    if len(instance.secondary_nodes) != 1:
5291
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5292

    
5293
    self.required_nodes = 1
5294

    
5295
    disk_space = _ComputeDiskSize(instance.disk_template,
5296
                                  instance.disks[0].size,
5297
                                  instance.disks[1].size)
5298

    
5299
    request = {
5300
      "type": "relocate",
5301
      "name": self.name,
5302
      "disk_space_total": disk_space,
5303
      "required_nodes": self.required_nodes,
5304
      "relocate_from": self.relocate_from,
5305
      }
5306
    self.in_data["request"] = request
5307

    
5308
  def _BuildInputData(self):
5309
    """Build input data structures.
5310

5311
    """
5312
    self._ComputeClusterData()
5313

    
5314
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5315
      self._AddNewInstance()
5316
    else:
5317
      self._AddRelocateInstance()
5318

    
5319
    self.in_text = serializer.Dump(self.in_data)
5320

    
5321
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5322
    """Run an instance allocator and return the results.
5323

5324
    """
5325
    data = self.in_text
5326

    
5327
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5328

    
5329
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5330
      raise errors.OpExecError("Invalid result from master iallocator runner")
5331

    
5332
    rcode, stdout, stderr, fail = result
5333

    
5334
    if rcode == constants.IARUN_NOTFOUND:
5335
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5336
    elif rcode == constants.IARUN_FAILURE:
5337
      raise errors.OpExecError("Instance allocator call failed: %s,"
5338
                               " output: %s" % (fail, stdout+stderr))
5339
    self.out_text = stdout
5340
    if validate:
5341
      self._ValidateResult()
5342

    
5343
  def _ValidateResult(self):
5344
    """Process the allocator results.
5345

5346
    This will process and if successful save the result in
5347
    self.out_data and the other parameters.
5348

5349
    """
5350
    try:
5351
      rdict = serializer.Load(self.out_text)
5352
    except Exception, err:
5353
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5354

    
5355
    if not isinstance(rdict, dict):
5356
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5357

    
5358
    for key in "success", "info", "nodes":
5359
      if key not in rdict:
5360
        raise errors.OpExecError("Can't parse iallocator results:"
5361
                                 " missing key '%s'" % key)
5362
      setattr(self, key, rdict[key])
5363

    
5364
    if not isinstance(rdict["nodes"], list):
5365
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5366
                               " is not a list")
5367
    self.out_data = rdict
5368

    
5369

    
5370
class LUTestAllocator(NoHooksLU):
5371
  """Run allocator tests.
5372

5373
  This LU runs the allocator tests
5374

5375
  """
5376
  _OP_REQP = ["direction", "mode", "name"]
5377

    
5378
  def CheckPrereq(self):
5379
    """Check prerequisites.
5380

5381
    This checks the opcode parameters depending on the director and mode test.
5382

5383
    """
5384
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5385
      for attr in ["name", "mem_size", "disks", "disk_template",
5386
                   "os", "tags", "nics", "vcpus"]:
5387
        if not hasattr(self.op, attr):
5388
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5389
                                     attr)
5390
      iname = self.cfg.ExpandInstanceName(self.op.name)
5391
      if iname is not None:
5392
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5393
                                   iname)
5394
      if not isinstance(self.op.nics, list):
5395
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5396
      for row in self.op.nics:
5397
        if (not isinstance(row, dict) or
5398
            "mac" not in row or
5399
            "ip" not in row or
5400
            "bridge" not in row):
5401
          raise errors.OpPrereqError("Invalid contents of the"
5402
                                     " 'nics' parameter")
5403
      if not isinstance(self.op.disks, list):
5404
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5405
      if len(self.op.disks) != 2:
5406
        raise errors.OpPrereqError("Only two-disk configurations supported")
5407
      for row in self.op.disks:
5408
        if (not isinstance(row, dict) or
5409
            "size" not in row or
5410
            not isinstance(row["size"], int) or
5411
            "mode" not in row or
5412
            row["mode"] not in ['r', 'w']):
5413
          raise errors.OpPrereqError("Invalid contents of the"
5414
                                     " 'disks' parameter")
5415
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5416
      if not hasattr(self.op, "name"):
5417
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5418
      fname = self.cfg.ExpandInstanceName(self.op.name)
5419
      if fname is None:
5420
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5421
                                   self.op.name)
5422
      self.op.name = fname
5423
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5424
    else:
5425
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5426
                                 self.op.mode)
5427

    
5428
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5429
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5430
        raise errors.OpPrereqError("Missing allocator name")
5431
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5432
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5433
                                 self.op.direction)
5434

    
5435
  def Exec(self, feedback_fn):
5436
    """Run the allocator test.
5437

5438
    """
5439
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5440
      ial = IAllocator(self.cfg, self.sstore,
5441
                       mode=self.op.mode,
5442
                       name=self.op.name,
5443
                       mem_size=self.op.mem_size,
5444
                       disks=self.op.disks,
5445
                       disk_template=self.op.disk_template,
5446
                       os=self.op.os,
5447
                       tags=self.op.tags,
5448
                       nics=self.op.nics,
5449
                       vcpus=self.op.vcpus,
5450
                       )
5451
    else:
5452
      ial = IAllocator(self.cfg, self.sstore,
5453
                       mode=self.op.mode,
5454
                       name=self.op.name,
5455
                       relocate_from=list(self.relocate_from),
5456
                       )
5457

    
5458
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5459
      result = ial.in_text
5460
    else:
5461
      ial.Run(self.op.allocator, validate=False)
5462
      result = ial.out_text
5463
    return result