Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c2c2a903

History | View | Annotate | Download (185.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    # Dicts used to declare locking needs to mcpu
84
    self.needed_locks = None
85
    self.acquired_locks = {}
86
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87
    self.add_locks = {}
88
    self.remove_locks = {}
89
    # Used to force good behavior when calling helper functions
90
    self.recalculate_locks = {}
91
    self.__ssh = None
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98

    
99
    if not self.cfg.IsCluster():
100
      raise errors.OpPrereqError("Cluster not initialized yet,"
101
                                 " use 'gnt-cluster init' first.")
102
    if self.REQ_MASTER:
103
      master = sstore.GetMasterNode()
104
      if master != utils.HostInfo().name:
105
        raise errors.OpPrereqError("Commands must be run on the master"
106
                                   " node %s" % master)
107

    
108
  def __GetSSH(self):
109
    """Returns the SshRunner object
110

111
    """
112
    if not self.__ssh:
113
      self.__ssh = ssh.SshRunner(self.sstore)
114
    return self.__ssh
115

    
116
  ssh = property(fget=__GetSSH)
117

    
118
  def ExpandNames(self):
119
    """Expand names for this LU.
120

121
    This method is called before starting to execute the opcode, and it should
122
    update all the parameters of the opcode to their canonical form (e.g. a
123
    short node name must be fully expanded after this method has successfully
124
    completed). This way locking, hooks, logging, ecc. can work correctly.
125

126
    LUs which implement this method must also populate the self.needed_locks
127
    member, as a dict with lock levels as keys, and a list of needed lock names
128
    as values. Rules:
129
      - Use an empty dict if you don't need any lock
130
      - If you don't need any lock at a particular level omit that level
131
      - Don't put anything for the BGL level
132
      - If you want all locks at a level use locking.ALL_SET as a value
133

134
    If you need to share locks (rather than acquire them exclusively) at one
135
    level you can modify self.share_locks, setting a true value (usually 1) for
136
    that level. By default locks are not shared.
137

138
    Examples:
139
    # Acquire all nodes and one instance
140
    self.needed_locks = {
141
      locking.LEVEL_NODE: locking.ALL_SET,
142
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
143
    }
144
    # Acquire just two nodes
145
    self.needed_locks = {
146
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147
    }
148
    # Acquire no locks
149
    self.needed_locks = {} # No, you can't leave it to the default value None
150

151
    """
152
    # The implementation of this method is mandatory only if the new LU is
153
    # concurrent, so that old LUs don't need to be changed all at the same
154
    # time.
155
    if self.REQ_BGL:
156
      self.needed_locks = {} # Exclusive LUs don't need locks.
157
    else:
158
      raise NotImplementedError
159

    
160
  def DeclareLocks(self, level):
161
    """Declare LU locking needs for a level
162

163
    While most LUs can just declare their locking needs at ExpandNames time,
164
    sometimes there's the need to calculate some locks after having acquired
165
    the ones before. This function is called just before acquiring locks at a
166
    particular level, but after acquiring the ones at lower levels, and permits
167
    such calculations. It can be used to modify self.needed_locks, and by
168
    default it does nothing.
169

170
    This function is only called if you have something already set in
171
    self.needed_locks for the level.
172

173
    @param level: Locking level which is going to be locked
174
    @type level: member of ganeti.locking.LEVELS
175

176
    """
177

    
178
  def CheckPrereq(self):
179
    """Check prerequisites for this LU.
180

181
    This method should check that the prerequisites for the execution
182
    of this LU are fulfilled. It can do internode communication, but
183
    it should be idempotent - no cluster or system changes are
184
    allowed.
185

186
    The method should raise errors.OpPrereqError in case something is
187
    not fulfilled. Its return value is ignored.
188

189
    This method should also update all the parameters of the opcode to
190
    their canonical form if it hasn't been done by ExpandNames before.
191

192
    """
193
    raise NotImplementedError
194

    
195
  def Exec(self, feedback_fn):
196
    """Execute the LU.
197

198
    This method should implement the actual work. It should raise
199
    errors.OpExecError for failures that are somewhat dealt with in
200
    code, or expected.
201

202
    """
203
    raise NotImplementedError
204

    
205
  def BuildHooksEnv(self):
206
    """Build hooks environment for this LU.
207

208
    This method should return a three-node tuple consisting of: a dict
209
    containing the environment that will be used for running the
210
    specific hook for this LU, a list of node names on which the hook
211
    should run before the execution, and a list of node names on which
212
    the hook should run after the execution.
213

214
    The keys of the dict must not have 'GANETI_' prefixed as this will
215
    be handled in the hooks runner. Also note additional keys will be
216
    added by the hooks runner. If the LU doesn't define any
217
    environment, an empty dict (and not None) should be returned.
218

219
    No nodes should be returned as an empty list (and not None).
220

221
    Note that if the HPATH for a LU class is None, this function will
222
    not be called.
223

224
    """
225
    raise NotImplementedError
226

    
227
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
228
    """Notify the LU about the results of its hooks.
229

230
    This method is called every time a hooks phase is executed, and notifies
231
    the Logical Unit about the hooks' result. The LU can then use it to alter
232
    its result based on the hooks.  By default the method does nothing and the
233
    previous result is passed back unchanged but any LU can define it if it
234
    wants to use the local cluster hook-scripts somehow.
235

236
    Args:
237
      phase: the hooks phase that has just been run
238
      hooks_results: the results of the multi-node hooks rpc call
239
      feedback_fn: function to send feedback back to the caller
240
      lu_result: the previous result this LU had, or None in the PRE phase.
241

242
    """
243
    return lu_result
244

    
245
  def _ExpandAndLockInstance(self):
246
    """Helper function to expand and lock an instance.
247

248
    Many LUs that work on an instance take its name in self.op.instance_name
249
    and need to expand it and then declare the expanded name for locking. This
250
    function does it, and then updates self.op.instance_name to the expanded
251
    name. It also initializes needed_locks as a dict, if this hasn't been done
252
    before.
253

254
    """
255
    if self.needed_locks is None:
256
      self.needed_locks = {}
257
    else:
258
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
259
        "_ExpandAndLockInstance called with instance-level locks set"
260
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
261
    if expanded_name is None:
262
      raise errors.OpPrereqError("Instance '%s' not known" %
263
                                  self.op.instance_name)
264
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
265
    self.op.instance_name = expanded_name
266

    
267
  def _LockInstancesNodes(self, primary_only=False):
268
    """Helper function to declare instances' nodes for locking.
269

270
    This function should be called after locking one or more instances to lock
271
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
272
    with all primary or secondary nodes for instances already locked and
273
    present in self.needed_locks[locking.LEVEL_INSTANCE].
274

275
    It should be called from DeclareLocks, and for safety only works if
276
    self.recalculate_locks[locking.LEVEL_NODE] is set.
277

278
    In the future it may grow parameters to just lock some instance's nodes, or
279
    to just lock primaries or secondary nodes, if needed.
280

281
    If should be called in DeclareLocks in a way similar to:
282

283
    if level == locking.LEVEL_NODE:
284
      self._LockInstancesNodes()
285

286
    @type primary_only: boolean
287
    @param primary_only: only lock primary nodes of locked instances
288

289
    """
290
    assert locking.LEVEL_NODE in self.recalculate_locks, \
291
      "_LockInstancesNodes helper function called with no nodes to recalculate"
292

    
293
    # TODO: check if we're really been called with the instance locks held
294

    
295
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
296
    # future we might want to have different behaviors depending on the value
297
    # of self.recalculate_locks[locking.LEVEL_NODE]
298
    wanted_nodes = []
299
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
300
      instance = self.context.cfg.GetInstanceInfo(instance_name)
301
      wanted_nodes.append(instance.primary_node)
302
      if not primary_only:
303
        wanted_nodes.extend(instance.secondary_nodes)
304

    
305
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
306
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
307
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
308
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
309

    
310
    del self.recalculate_locks[locking.LEVEL_NODE]
311

    
312

    
313
class NoHooksLU(LogicalUnit):
314
  """Simple LU which runs no hooks.
315

316
  This LU is intended as a parent for other LogicalUnits which will
317
  run no hooks, in order to reduce duplicate code.
318

319
  """
320
  HPATH = None
321
  HTYPE = None
322

    
323

    
324
def _GetWantedNodes(lu, nodes):
325
  """Returns list of checked and expanded node names.
326

327
  Args:
328
    nodes: List of nodes (strings) or None for all
329

330
  """
331
  if not isinstance(nodes, list):
332
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
333

    
334
  if not nodes:
335
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
336
      " non-empty list of nodes whose name is to be expanded.")
337

    
338
  wanted = []
339
  for name in nodes:
340
    node = lu.cfg.ExpandNodeName(name)
341
    if node is None:
342
      raise errors.OpPrereqError("No such node name '%s'" % name)
343
    wanted.append(node)
344

    
345
  return utils.NiceSort(wanted)
346

    
347

    
348
def _GetWantedInstances(lu, instances):
349
  """Returns list of checked and expanded instance names.
350

351
  Args:
352
    instances: List of instances (strings) or None for all
353

354
  """
355
  if not isinstance(instances, list):
356
    raise errors.OpPrereqError("Invalid argument type 'instances'")
357

    
358
  if instances:
359
    wanted = []
360

    
361
    for name in instances:
362
      instance = lu.cfg.ExpandInstanceName(name)
363
      if instance is None:
364
        raise errors.OpPrereqError("No such instance name '%s'" % name)
365
      wanted.append(instance)
366

    
367
  else:
368
    wanted = lu.cfg.GetInstanceList()
369
  return utils.NiceSort(wanted)
370

    
371

    
372
def _CheckOutputFields(static, dynamic, selected):
373
  """Checks whether all selected fields are valid.
374

375
  Args:
376
    static: Static fields
377
    dynamic: Dynamic fields
378

379
  """
380
  static_fields = frozenset(static)
381
  dynamic_fields = frozenset(dynamic)
382

    
383
  all_fields = static_fields | dynamic_fields
384

    
385
  if not all_fields.issuperset(selected):
386
    raise errors.OpPrereqError("Unknown output fields selected: %s"
387
                               % ",".join(frozenset(selected).
388
                                          difference(all_fields)))
389

    
390

    
391
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
392
                          memory, vcpus, nics):
393
  """Builds instance related env variables for hooks from single variables.
394

395
  Args:
396
    secondary_nodes: List of secondary nodes as strings
397
  """
398
  env = {
399
    "OP_TARGET": name,
400
    "INSTANCE_NAME": name,
401
    "INSTANCE_PRIMARY": primary_node,
402
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
403
    "INSTANCE_OS_TYPE": os_type,
404
    "INSTANCE_STATUS": status,
405
    "INSTANCE_MEMORY": memory,
406
    "INSTANCE_VCPUS": vcpus,
407
  }
408

    
409
  if nics:
410
    nic_count = len(nics)
411
    for idx, (ip, bridge, mac) in enumerate(nics):
412
      if ip is None:
413
        ip = ""
414
      env["INSTANCE_NIC%d_IP" % idx] = ip
415
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
416
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
417
  else:
418
    nic_count = 0
419

    
420
  env["INSTANCE_NIC_COUNT"] = nic_count
421

    
422
  return env
423

    
424

    
425
def _BuildInstanceHookEnvByObject(instance, override=None):
426
  """Builds instance related env variables for hooks from an object.
427

428
  Args:
429
    instance: objects.Instance object of instance
430
    override: dict of values to override
431
  """
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': instance.memory,
439
    'vcpus': instance.vcpus,
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.sstore.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.sstore.GetMasterNode()
489
    if not rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    # checks config file checksum
555
    # checks ssh to any
556

    
557
    if 'filelist' not in node_result:
558
      bad = True
559
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
560
    else:
561
      remote_cksum = node_result['filelist']
562
      for file_name in file_list:
563
        if file_name not in remote_cksum:
564
          bad = True
565
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
566
        elif remote_cksum[file_name] != local_cksum[file_name]:
567
          bad = True
568
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
569

    
570
    if 'nodelist' not in node_result:
571
      bad = True
572
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
573
    else:
574
      if node_result['nodelist']:
575
        bad = True
576
        for node in node_result['nodelist']:
577
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
578
                          (node, node_result['nodelist'][node]))
579
    if 'node-net-test' not in node_result:
580
      bad = True
581
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
582
    else:
583
      if node_result['node-net-test']:
584
        bad = True
585
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
586
        for node in nlist:
587
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
588
                          (node, node_result['node-net-test'][node]))
589

    
590
    hyp_result = node_result.get('hypervisor', None)
591
    if hyp_result is not None:
592
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
593
    return bad
594

    
595
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
596
                      node_instance, feedback_fn):
597
    """Verify an instance.
598

599
    This function checks to see if the required block devices are
600
    available on the instance's node.
601

602
    """
603
    bad = False
604

    
605
    node_current = instanceconfig.primary_node
606

    
607
    node_vol_should = {}
608
    instanceconfig.MapLVsByNode(node_vol_should)
609

    
610
    for node in node_vol_should:
611
      for volume in node_vol_should[node]:
612
        if node not in node_vol_is or volume not in node_vol_is[node]:
613
          feedback_fn("  - ERROR: volume %s missing on node %s" %
614
                          (volume, node))
615
          bad = True
616

    
617
    if not instanceconfig.status == 'down':
618
      if (node_current not in node_instance or
619
          not instance in node_instance[node_current]):
620
        feedback_fn("  - ERROR: instance %s not running on node %s" %
621
                        (instance, node_current))
622
        bad = True
623

    
624
    for node in node_instance:
625
      if (not node == node_current):
626
        if instance in node_instance[node]:
627
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
628
                          (instance, node))
629
          bad = True
630

    
631
    return bad
632

    
633
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
634
    """Verify if there are any unknown volumes in the cluster.
635

636
    The .os, .swap and backup volumes are ignored. All other volumes are
637
    reported as unknown.
638

639
    """
640
    bad = False
641

    
642
    for node in node_vol_is:
643
      for volume in node_vol_is[node]:
644
        if node not in node_vol_should or volume not in node_vol_should[node]:
645
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
646
                      (volume, node))
647
          bad = True
648
    return bad
649

    
650
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
651
    """Verify the list of running instances.
652

653
    This checks what instances are running but unknown to the cluster.
654

655
    """
656
    bad = False
657
    for node in node_instance:
658
      for runninginstance in node_instance[node]:
659
        if runninginstance not in instancelist:
660
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
661
                          (runninginstance, node))
662
          bad = True
663
    return bad
664

    
665
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
666
    """Verify N+1 Memory Resilience.
667

668
    Check that if one single node dies we can still start all the instances it
669
    was primary for.
670

671
    """
672
    bad = False
673

    
674
    for node, nodeinfo in node_info.iteritems():
675
      # This code checks that every node which is now listed as secondary has
676
      # enough memory to host all instances it is supposed to should a single
677
      # other node in the cluster fail.
678
      # FIXME: not ready for failover to an arbitrary node
679
      # FIXME: does not support file-backed instances
680
      # WARNING: we currently take into account down instances as well as up
681
      # ones, considering that even if they're down someone might want to start
682
      # them even in the event of a node failure.
683
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
684
        needed_mem = 0
685
        for instance in instances:
686
          needed_mem += instance_cfg[instance].memory
687
        if nodeinfo['mfree'] < needed_mem:
688
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
689
                      " failovers should node %s fail" % (node, prinode))
690
          bad = True
691
    return bad
692

    
693
  def CheckPrereq(self):
694
    """Check prerequisites.
695

696
    Transform the list of checks we're going to skip into a set and check that
697
    all its members are valid.
698

699
    """
700
    self.skip_set = frozenset(self.op.skip_checks)
701
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
702
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
703

    
704
  def BuildHooksEnv(self):
705
    """Build hooks env.
706

707
    Cluster-Verify hooks just rone in the post phase and their failure makes
708
    the output be logged in the verify output and the verification to fail.
709

710
    """
711
    all_nodes = self.cfg.GetNodeList()
712
    # TODO: populate the environment with useful information for verify hooks
713
    env = {}
714
    return env, [], all_nodes
715

    
716
  def Exec(self, feedback_fn):
717
    """Verify integrity of cluster, performing various test on nodes.
718

719
    """
720
    bad = False
721
    feedback_fn("* Verifying global settings")
722
    for msg in self.cfg.VerifyConfig():
723
      feedback_fn("  - ERROR: %s" % msg)
724

    
725
    vg_name = self.cfg.GetVGName()
726
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
727
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
728
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
729
    i_non_redundant = [] # Non redundant instances
730
    node_volume = {}
731
    node_instance = {}
732
    node_info = {}
733
    instance_cfg = {}
734

    
735
    # FIXME: verify OS list
736
    # do local checksums
737
    file_names = list(self.sstore.GetFileList())
738
    file_names.append(constants.SSL_CERT_FILE)
739
    file_names.append(constants.CLUSTER_CONF_FILE)
740
    local_checksums = utils.FingerprintFiles(file_names)
741

    
742
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
743
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
744
    all_instanceinfo = rpc.call_instance_list(nodelist)
745
    all_vglist = rpc.call_vg_list(nodelist)
746
    node_verify_param = {
747
      'filelist': file_names,
748
      'nodelist': nodelist,
749
      'hypervisor': None,
750
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
751
                        for node in nodeinfo]
752
      }
753
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
754
    all_rversion = rpc.call_version(nodelist)
755
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
756

    
757
    for node in nodelist:
758
      feedback_fn("* Verifying node %s" % node)
759
      result = self._VerifyNode(node, file_names, local_checksums,
760
                                all_vglist[node], all_nvinfo[node],
761
                                all_rversion[node], feedback_fn)
762
      bad = bad or result
763

    
764
      # node_volume
765
      volumeinfo = all_volumeinfo[node]
766

    
767
      if isinstance(volumeinfo, basestring):
768
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
769
                    (node, volumeinfo[-400:].encode('string_escape')))
770
        bad = True
771
        node_volume[node] = {}
772
      elif not isinstance(volumeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776
      else:
777
        node_volume[node] = volumeinfo
778

    
779
      # node_instance
780
      nodeinstance = all_instanceinfo[node]
781
      if type(nodeinstance) != list:
782
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
783
        bad = True
784
        continue
785

    
786
      node_instance[node] = nodeinstance
787

    
788
      # node_info
789
      nodeinfo = all_ninfo[node]
790
      if not isinstance(nodeinfo, dict):
791
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
792
        bad = True
793
        continue
794

    
795
      try:
796
        node_info[node] = {
797
          "mfree": int(nodeinfo['memory_free']),
798
          "dfree": int(nodeinfo['vg_free']),
799
          "pinst": [],
800
          "sinst": [],
801
          # dictionary holding all instances this node is secondary for,
802
          # grouped by their primary node. Each key is a cluster node, and each
803
          # value is a list of instances which have the key as primary and the
804
          # current node as secondary.  this is handy to calculate N+1 memory
805
          # availability if you can only failover from a primary to its
806
          # secondary.
807
          "sinst-by-pnode": {},
808
        }
809
      except ValueError:
810
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
811
        bad = True
812
        continue
813

    
814
    node_vol_should = {}
815

    
816
    for instance in instancelist:
817
      feedback_fn("* Verifying instance %s" % instance)
818
      inst_config = self.cfg.GetInstanceInfo(instance)
819
      result =  self._VerifyInstance(instance, inst_config, node_volume,
820
                                     node_instance, feedback_fn)
821
      bad = bad or result
822

    
823
      inst_config.MapLVsByNode(node_vol_should)
824

    
825
      instance_cfg[instance] = inst_config
826

    
827
      pnode = inst_config.primary_node
828
      if pnode in node_info:
829
        node_info[pnode]['pinst'].append(instance)
830
      else:
831
        feedback_fn("  - ERROR: instance %s, connection to primary node"
832
                    " %s failed" % (instance, pnode))
833
        bad = True
834

    
835
      # If the instance is non-redundant we cannot survive losing its primary
836
      # node, so we are not N+1 compliant. On the other hand we have no disk
837
      # templates with more than one secondary so that situation is not well
838
      # supported either.
839
      # FIXME: does not support file-backed instances
840
      if len(inst_config.secondary_nodes) == 0:
841
        i_non_redundant.append(instance)
842
      elif len(inst_config.secondary_nodes) > 1:
843
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
844
                    % instance)
845

    
846
      for snode in inst_config.secondary_nodes:
847
        if snode in node_info:
848
          node_info[snode]['sinst'].append(instance)
849
          if pnode not in node_info[snode]['sinst-by-pnode']:
850
            node_info[snode]['sinst-by-pnode'][pnode] = []
851
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
852
        else:
853
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
854
                      " %s failed" % (instance, snode))
855

    
856
    feedback_fn("* Verifying orphan volumes")
857
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
858
                                       feedback_fn)
859
    bad = bad or result
860

    
861
    feedback_fn("* Verifying remaining instances")
862
    result = self._VerifyOrphanInstances(instancelist, node_instance,
863
                                         feedback_fn)
864
    bad = bad or result
865

    
866
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
867
      feedback_fn("* Verifying N+1 Memory redundancy")
868
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
869
      bad = bad or result
870

    
871
    feedback_fn("* Other Notes")
872
    if i_non_redundant:
873
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
874
                  % len(i_non_redundant))
875

    
876
    return not bad
877

    
878
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
879
    """Analize the post-hooks' result, handle it, and send some
880
    nicely-formatted feedback back to the user.
881

882
    Args:
883
      phase: the hooks phase that has just been run
884
      hooks_results: the results of the multi-node hooks rpc call
885
      feedback_fn: function to send feedback back to the caller
886
      lu_result: previous Exec result
887

888
    """
889
    # We only really run POST phase hooks, and are only interested in
890
    # their results
891
    if phase == constants.HOOKS_PHASE_POST:
892
      # Used to change hooks' output to proper indentation
893
      indent_re = re.compile('^', re.M)
894
      feedback_fn("* Hooks Results")
895
      if not hooks_results:
896
        feedback_fn("  - ERROR: general communication failure")
897
        lu_result = 1
898
      else:
899
        for node_name in hooks_results:
900
          show_node_header = True
901
          res = hooks_results[node_name]
902
          if res is False or not isinstance(res, list):
903
            feedback_fn("    Communication failure")
904
            lu_result = 1
905
            continue
906
          for script, hkr, output in res:
907
            if hkr == constants.HKR_FAIL:
908
              # The node header is only shown once, if there are
909
              # failing hooks on that node
910
              if show_node_header:
911
                feedback_fn("  Node %s:" % node_name)
912
                show_node_header = False
913
              feedback_fn("    ERROR: Script %s failed, output:" % script)
914
              output = indent_re.sub('      ', output)
915
              feedback_fn("%s" % output)
916
              lu_result = 1
917

    
918
      return lu_result
919

    
920

    
921
class LUVerifyDisks(NoHooksLU):
922
  """Verifies the cluster disks status.
923

924
  """
925
  _OP_REQP = []
926
  REQ_BGL = False
927

    
928
  def ExpandNames(self):
929
    self.needed_locks = {
930
      locking.LEVEL_NODE: locking.ALL_SET,
931
      locking.LEVEL_INSTANCE: locking.ALL_SET,
932
    }
933
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
934

    
935
  def CheckPrereq(self):
936
    """Check prerequisites.
937

938
    This has no prerequisites.
939

940
    """
941
    pass
942

    
943
  def Exec(self, feedback_fn):
944
    """Verify integrity of cluster disks.
945

946
    """
947
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
948

    
949
    vg_name = self.cfg.GetVGName()
950
    nodes = utils.NiceSort(self.cfg.GetNodeList())
951
    instances = [self.cfg.GetInstanceInfo(name)
952
                 for name in self.cfg.GetInstanceList()]
953

    
954
    nv_dict = {}
955
    for inst in instances:
956
      inst_lvs = {}
957
      if (inst.status != "up" or
958
          inst.disk_template not in constants.DTS_NET_MIRROR):
959
        continue
960
      inst.MapLVsByNode(inst_lvs)
961
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
962
      for node, vol_list in inst_lvs.iteritems():
963
        for vol in vol_list:
964
          nv_dict[(node, vol)] = inst
965

    
966
    if not nv_dict:
967
      return result
968

    
969
    node_lvs = rpc.call_volume_list(nodes, vg_name)
970

    
971
    to_act = set()
972
    for node in nodes:
973
      # node_volume
974
      lvs = node_lvs[node]
975

    
976
      if isinstance(lvs, basestring):
977
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
978
        res_nlvm[node] = lvs
979
      elif not isinstance(lvs, dict):
980
        logger.Info("connection to node %s failed or invalid data returned" %
981
                    (node,))
982
        res_nodes.append(node)
983
        continue
984

    
985
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
986
        inst = nv_dict.pop((node, lv_name), None)
987
        if (not lv_online and inst is not None
988
            and inst.name not in res_instances):
989
          res_instances.append(inst.name)
990

    
991
    # any leftover items in nv_dict are missing LVs, let's arrange the
992
    # data better
993
    for key, inst in nv_dict.iteritems():
994
      if inst.name not in res_missing:
995
        res_missing[inst.name] = []
996
      res_missing[inst.name].append(key)
997

    
998
    return result
999

    
1000

    
1001
class LURenameCluster(LogicalUnit):
1002
  """Rename the cluster.
1003

1004
  """
1005
  HPATH = "cluster-rename"
1006
  HTYPE = constants.HTYPE_CLUSTER
1007
  _OP_REQP = ["name"]
1008
  REQ_WSSTORE = True
1009

    
1010
  def BuildHooksEnv(self):
1011
    """Build hooks env.
1012

1013
    """
1014
    env = {
1015
      "OP_TARGET": self.sstore.GetClusterName(),
1016
      "NEW_NAME": self.op.name,
1017
      }
1018
    mn = self.sstore.GetMasterNode()
1019
    return env, [mn], [mn]
1020

    
1021
  def CheckPrereq(self):
1022
    """Verify that the passed name is a valid one.
1023

1024
    """
1025
    hostname = utils.HostInfo(self.op.name)
1026

    
1027
    new_name = hostname.name
1028
    self.ip = new_ip = hostname.ip
1029
    old_name = self.sstore.GetClusterName()
1030
    old_ip = self.sstore.GetMasterIP()
1031
    if new_name == old_name and new_ip == old_ip:
1032
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1033
                                 " cluster has changed")
1034
    if new_ip != old_ip:
1035
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1036
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1037
                                   " reachable on the network. Aborting." %
1038
                                   new_ip)
1039

    
1040
    self.op.name = new_name
1041

    
1042
  def Exec(self, feedback_fn):
1043
    """Rename the cluster.
1044

1045
    """
1046
    clustername = self.op.name
1047
    ip = self.ip
1048
    ss = self.sstore
1049

    
1050
    # shutdown the master IP
1051
    master = ss.GetMasterNode()
1052
    if not rpc.call_node_stop_master(master, False):
1053
      raise errors.OpExecError("Could not disable the master role")
1054

    
1055
    try:
1056
      # modify the sstore
1057
      ss.SetKey(ss.SS_MASTER_IP, ip)
1058
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1059

    
1060
      # Distribute updated ss config to all nodes
1061
      myself = self.cfg.GetNodeInfo(master)
1062
      dist_nodes = self.cfg.GetNodeList()
1063
      if myself.name in dist_nodes:
1064
        dist_nodes.remove(myself.name)
1065

    
1066
      logger.Debug("Copying updated ssconf data to all nodes")
1067
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1068
        fname = ss.KeyToFilename(keyname)
1069
        result = rpc.call_upload_file(dist_nodes, fname)
1070
        for to_node in dist_nodes:
1071
          if not result[to_node]:
1072
            logger.Error("copy of file %s to node %s failed" %
1073
                         (fname, to_node))
1074
    finally:
1075
      if not rpc.call_node_start_master(master, False):
1076
        logger.Error("Could not re-enable the master role on the master,"
1077
                     " please restart manually.")
1078

    
1079

    
1080
def _RecursiveCheckIfLVMBased(disk):
1081
  """Check if the given disk or its children are lvm-based.
1082

1083
  Args:
1084
    disk: ganeti.objects.Disk object
1085

1086
  Returns:
1087
    boolean indicating whether a LD_LV dev_type was found or not
1088

1089
  """
1090
  if disk.children:
1091
    for chdisk in disk.children:
1092
      if _RecursiveCheckIfLVMBased(chdisk):
1093
        return True
1094
  return disk.dev_type == constants.LD_LV
1095

    
1096

    
1097
class LUSetClusterParams(LogicalUnit):
1098
  """Change the parameters of the cluster.
1099

1100
  """
1101
  HPATH = "cluster-modify"
1102
  HTYPE = constants.HTYPE_CLUSTER
1103
  _OP_REQP = []
1104
  REQ_BGL = False
1105

    
1106
  def ExpandNames(self):
1107
    # FIXME: in the future maybe other cluster params won't require checking on
1108
    # all nodes to be modified.
1109
    self.needed_locks = {
1110
      locking.LEVEL_NODE: locking.ALL_SET,
1111
    }
1112
    self.share_locks[locking.LEVEL_NODE] = 1
1113

    
1114
  def BuildHooksEnv(self):
1115
    """Build hooks env.
1116

1117
    """
1118
    env = {
1119
      "OP_TARGET": self.sstore.GetClusterName(),
1120
      "NEW_VG_NAME": self.op.vg_name,
1121
      }
1122
    mn = self.sstore.GetMasterNode()
1123
    return env, [mn], [mn]
1124

    
1125
  def CheckPrereq(self):
1126
    """Check prerequisites.
1127

1128
    This checks whether the given params don't conflict and
1129
    if the given volume group is valid.
1130

1131
    """
1132
    # FIXME: This only works because there is only one parameter that can be
1133
    # changed or removed.
1134
    if not self.op.vg_name:
1135
      instances = self.cfg.GetAllInstancesInfo().values()
1136
      for inst in instances:
1137
        for disk in inst.disks:
1138
          if _RecursiveCheckIfLVMBased(disk):
1139
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1140
                                       " lvm-based instances exist")
1141

    
1142
    # if vg_name not None, checks given volume group on all nodes
1143
    if self.op.vg_name:
1144
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1145
      vglist = rpc.call_vg_list(node_list)
1146
      for node in node_list:
1147
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1148
                                              constants.MIN_VG_SIZE)
1149
        if vgstatus:
1150
          raise errors.OpPrereqError("Error on node '%s': %s" %
1151
                                     (node, vgstatus))
1152

    
1153
  def Exec(self, feedback_fn):
1154
    """Change the parameters of the cluster.
1155

1156
    """
1157
    if self.op.vg_name != self.cfg.GetVGName():
1158
      self.cfg.SetVGName(self.op.vg_name)
1159
    else:
1160
      feedback_fn("Cluster LVM configuration already in desired"
1161
                  " state, not changing")
1162

    
1163

    
1164
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1165
  """Sleep and poll for an instance's disk to sync.
1166

1167
  """
1168
  if not instance.disks:
1169
    return True
1170

    
1171
  if not oneshot:
1172
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1173

    
1174
  node = instance.primary_node
1175

    
1176
  for dev in instance.disks:
1177
    cfgw.SetDiskID(dev, node)
1178

    
1179
  retries = 0
1180
  while True:
1181
    max_time = 0
1182
    done = True
1183
    cumul_degraded = False
1184
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1185
    if not rstats:
1186
      proc.LogWarning("Can't get any data from node %s" % node)
1187
      retries += 1
1188
      if retries >= 10:
1189
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1190
                                 " aborting." % node)
1191
      time.sleep(6)
1192
      continue
1193
    retries = 0
1194
    for i in range(len(rstats)):
1195
      mstat = rstats[i]
1196
      if mstat is None:
1197
        proc.LogWarning("Can't compute data for node %s/%s" %
1198
                        (node, instance.disks[i].iv_name))
1199
        continue
1200
      # we ignore the ldisk parameter
1201
      perc_done, est_time, is_degraded, _ = mstat
1202
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1203
      if perc_done is not None:
1204
        done = False
1205
        if est_time is not None:
1206
          rem_time = "%d estimated seconds remaining" % est_time
1207
          max_time = est_time
1208
        else:
1209
          rem_time = "no time estimate"
1210
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1211
                     (instance.disks[i].iv_name, perc_done, rem_time))
1212
    if done or oneshot:
1213
      break
1214

    
1215
    time.sleep(min(60, max_time))
1216

    
1217
  if done:
1218
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1219
  return not cumul_degraded
1220

    
1221

    
1222
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1223
  """Check that mirrors are not degraded.
1224

1225
  The ldisk parameter, if True, will change the test from the
1226
  is_degraded attribute (which represents overall non-ok status for
1227
  the device(s)) to the ldisk (representing the local storage status).
1228

1229
  """
1230
  cfgw.SetDiskID(dev, node)
1231
  if ldisk:
1232
    idx = 6
1233
  else:
1234
    idx = 5
1235

    
1236
  result = True
1237
  if on_primary or dev.AssembleOnSecondary():
1238
    rstats = rpc.call_blockdev_find(node, dev)
1239
    if not rstats:
1240
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1241
      result = False
1242
    else:
1243
      result = result and (not rstats[idx])
1244
  if dev.children:
1245
    for child in dev.children:
1246
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1247

    
1248
  return result
1249

    
1250

    
1251
class LUDiagnoseOS(NoHooksLU):
1252
  """Logical unit for OS diagnose/query.
1253

1254
  """
1255
  _OP_REQP = ["output_fields", "names"]
1256
  REQ_BGL = False
1257

    
1258
  def ExpandNames(self):
1259
    if self.op.names:
1260
      raise errors.OpPrereqError("Selective OS query not supported")
1261

    
1262
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1263
    _CheckOutputFields(static=[],
1264
                       dynamic=self.dynamic_fields,
1265
                       selected=self.op.output_fields)
1266

    
1267
    # Lock all nodes, in shared mode
1268
    self.needed_locks = {}
1269
    self.share_locks[locking.LEVEL_NODE] = 1
1270
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1271

    
1272
  def CheckPrereq(self):
1273
    """Check prerequisites.
1274

1275
    """
1276

    
1277
  @staticmethod
1278
  def _DiagnoseByOS(node_list, rlist):
1279
    """Remaps a per-node return list into an a per-os per-node dictionary
1280

1281
      Args:
1282
        node_list: a list with the names of all nodes
1283
        rlist: a map with node names as keys and OS objects as values
1284

1285
      Returns:
1286
        map: a map with osnames as keys and as value another map, with
1287
             nodes as
1288
             keys and list of OS objects as values
1289
             e.g. {"debian-etch": {"node1": [<object>,...],
1290
                                   "node2": [<object>,]}
1291
                  }
1292

1293
    """
1294
    all_os = {}
1295
    for node_name, nr in rlist.iteritems():
1296
      if not nr:
1297
        continue
1298
      for os_obj in nr:
1299
        if os_obj.name not in all_os:
1300
          # build a list of nodes for this os containing empty lists
1301
          # for each node in node_list
1302
          all_os[os_obj.name] = {}
1303
          for nname in node_list:
1304
            all_os[os_obj.name][nname] = []
1305
        all_os[os_obj.name][node_name].append(os_obj)
1306
    return all_os
1307

    
1308
  def Exec(self, feedback_fn):
1309
    """Compute the list of OSes.
1310

1311
    """
1312
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1313
    node_data = rpc.call_os_diagnose(node_list)
1314
    if node_data == False:
1315
      raise errors.OpExecError("Can't gather the list of OSes")
1316
    pol = self._DiagnoseByOS(node_list, node_data)
1317
    output = []
1318
    for os_name, os_data in pol.iteritems():
1319
      row = []
1320
      for field in self.op.output_fields:
1321
        if field == "name":
1322
          val = os_name
1323
        elif field == "valid":
1324
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1325
        elif field == "node_status":
1326
          val = {}
1327
          for node_name, nos_list in os_data.iteritems():
1328
            val[node_name] = [(v.status, v.path) for v in nos_list]
1329
        else:
1330
          raise errors.ParameterError(field)
1331
        row.append(val)
1332
      output.append(row)
1333

    
1334
    return output
1335

    
1336

    
1337
class LURemoveNode(LogicalUnit):
1338
  """Logical unit for removing a node.
1339

1340
  """
1341
  HPATH = "node-remove"
1342
  HTYPE = constants.HTYPE_NODE
1343
  _OP_REQP = ["node_name"]
1344

    
1345
  def BuildHooksEnv(self):
1346
    """Build hooks env.
1347

1348
    This doesn't run on the target node in the pre phase as a failed
1349
    node would then be impossible to remove.
1350

1351
    """
1352
    env = {
1353
      "OP_TARGET": self.op.node_name,
1354
      "NODE_NAME": self.op.node_name,
1355
      }
1356
    all_nodes = self.cfg.GetNodeList()
1357
    all_nodes.remove(self.op.node_name)
1358
    return env, all_nodes, all_nodes
1359

    
1360
  def CheckPrereq(self):
1361
    """Check prerequisites.
1362

1363
    This checks:
1364
     - the node exists in the configuration
1365
     - it does not have primary or secondary instances
1366
     - it's not the master
1367

1368
    Any errors are signalled by raising errors.OpPrereqError.
1369

1370
    """
1371
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1372
    if node is None:
1373
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1374

    
1375
    instance_list = self.cfg.GetInstanceList()
1376

    
1377
    masternode = self.sstore.GetMasterNode()
1378
    if node.name == masternode:
1379
      raise errors.OpPrereqError("Node is the master node,"
1380
                                 " you need to failover first.")
1381

    
1382
    for instance_name in instance_list:
1383
      instance = self.cfg.GetInstanceInfo(instance_name)
1384
      if node.name == instance.primary_node:
1385
        raise errors.OpPrereqError("Instance %s still running on the node,"
1386
                                   " please remove first." % instance_name)
1387
      if node.name in instance.secondary_nodes:
1388
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1389
                                   " please remove first." % instance_name)
1390
    self.op.node_name = node.name
1391
    self.node = node
1392

    
1393
  def Exec(self, feedback_fn):
1394
    """Removes the node from the cluster.
1395

1396
    """
1397
    node = self.node
1398
    logger.Info("stopping the node daemon and removing configs from node %s" %
1399
                node.name)
1400

    
1401
    self.context.RemoveNode(node.name)
1402

    
1403
    rpc.call_node_leave_cluster(node.name)
1404

    
1405

    
1406
class LUQueryNodes(NoHooksLU):
1407
  """Logical unit for querying nodes.
1408

1409
  """
1410
  _OP_REQP = ["output_fields", "names"]
1411
  REQ_BGL = False
1412

    
1413
  def ExpandNames(self):
1414
    self.dynamic_fields = frozenset([
1415
      "dtotal", "dfree",
1416
      "mtotal", "mnode", "mfree",
1417
      "bootid",
1418
      "ctotal",
1419
      ])
1420

    
1421
    self.static_fields = frozenset([
1422
      "name", "pinst_cnt", "sinst_cnt",
1423
      "pinst_list", "sinst_list",
1424
      "pip", "sip", "tags",
1425
      ])
1426

    
1427
    _CheckOutputFields(static=self.static_fields,
1428
                       dynamic=self.dynamic_fields,
1429
                       selected=self.op.output_fields)
1430

    
1431
    self.needed_locks = {}
1432
    self.share_locks[locking.LEVEL_NODE] = 1
1433

    
1434
    if self.op.names:
1435
      self.wanted = _GetWantedNodes(self, self.op.names)
1436
    else:
1437
      self.wanted = locking.ALL_SET
1438

    
1439
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1440
    if self.do_locking:
1441
      # if we don't request only static fields, we need to lock the nodes
1442
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1443

    
1444

    
1445
  def CheckPrereq(self):
1446
    """Check prerequisites.
1447

1448
    """
1449
    # The validation of the node list is done in the _GetWantedNodes,
1450
    # if non empty, and if empty, there's no validation to do
1451
    pass
1452

    
1453
  def Exec(self, feedback_fn):
1454
    """Computes the list of nodes and their attributes.
1455

1456
    """
1457
    all_info = self.cfg.GetAllNodesInfo()
1458
    if self.do_locking:
1459
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1460
    else:
1461
      nodenames = all_info.keys()
1462
    nodelist = [all_info[name] for name in nodenames]
1463

    
1464
    # begin data gathering
1465

    
1466
    if self.dynamic_fields.intersection(self.op.output_fields):
1467
      live_data = {}
1468
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1469
      for name in nodenames:
1470
        nodeinfo = node_data.get(name, None)
1471
        if nodeinfo:
1472
          live_data[name] = {
1473
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1474
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1475
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1476
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1477
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1478
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1479
            "bootid": nodeinfo['bootid'],
1480
            }
1481
        else:
1482
          live_data[name] = {}
1483
    else:
1484
      live_data = dict.fromkeys(nodenames, {})
1485

    
1486
    node_to_primary = dict([(name, set()) for name in nodenames])
1487
    node_to_secondary = dict([(name, set()) for name in nodenames])
1488

    
1489
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1490
                             "sinst_cnt", "sinst_list"))
1491
    if inst_fields & frozenset(self.op.output_fields):
1492
      instancelist = self.cfg.GetInstanceList()
1493

    
1494
      for instance_name in instancelist:
1495
        inst = self.cfg.GetInstanceInfo(instance_name)
1496
        if inst.primary_node in node_to_primary:
1497
          node_to_primary[inst.primary_node].add(inst.name)
1498
        for secnode in inst.secondary_nodes:
1499
          if secnode in node_to_secondary:
1500
            node_to_secondary[secnode].add(inst.name)
1501

    
1502
    # end data gathering
1503

    
1504
    output = []
1505
    for node in nodelist:
1506
      node_output = []
1507
      for field in self.op.output_fields:
1508
        if field == "name":
1509
          val = node.name
1510
        elif field == "pinst_list":
1511
          val = list(node_to_primary[node.name])
1512
        elif field == "sinst_list":
1513
          val = list(node_to_secondary[node.name])
1514
        elif field == "pinst_cnt":
1515
          val = len(node_to_primary[node.name])
1516
        elif field == "sinst_cnt":
1517
          val = len(node_to_secondary[node.name])
1518
        elif field == "pip":
1519
          val = node.primary_ip
1520
        elif field == "sip":
1521
          val = node.secondary_ip
1522
        elif field == "tags":
1523
          val = list(node.GetTags())
1524
        elif field in self.dynamic_fields:
1525
          val = live_data[node.name].get(field, None)
1526
        else:
1527
          raise errors.ParameterError(field)
1528
        node_output.append(val)
1529
      output.append(node_output)
1530

    
1531
    return output
1532

    
1533

    
1534
class LUQueryNodeVolumes(NoHooksLU):
1535
  """Logical unit for getting volumes on node(s).
1536

1537
  """
1538
  _OP_REQP = ["nodes", "output_fields"]
1539
  REQ_BGL = False
1540

    
1541
  def ExpandNames(self):
1542
    _CheckOutputFields(static=["node"],
1543
                       dynamic=["phys", "vg", "name", "size", "instance"],
1544
                       selected=self.op.output_fields)
1545

    
1546
    self.needed_locks = {}
1547
    self.share_locks[locking.LEVEL_NODE] = 1
1548
    if not self.op.nodes:
1549
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1550
    else:
1551
      self.needed_locks[locking.LEVEL_NODE] = \
1552
        _GetWantedNodes(self, self.op.nodes)
1553

    
1554
  def CheckPrereq(self):
1555
    """Check prerequisites.
1556

1557
    This checks that the fields required are valid output fields.
1558

1559
    """
1560
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1561

    
1562
  def Exec(self, feedback_fn):
1563
    """Computes the list of nodes and their attributes.
1564

1565
    """
1566
    nodenames = self.nodes
1567
    volumes = rpc.call_node_volumes(nodenames)
1568

    
1569
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1570
             in self.cfg.GetInstanceList()]
1571

    
1572
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1573

    
1574
    output = []
1575
    for node in nodenames:
1576
      if node not in volumes or not volumes[node]:
1577
        continue
1578

    
1579
      node_vols = volumes[node][:]
1580
      node_vols.sort(key=lambda vol: vol['dev'])
1581

    
1582
      for vol in node_vols:
1583
        node_output = []
1584
        for field in self.op.output_fields:
1585
          if field == "node":
1586
            val = node
1587
          elif field == "phys":
1588
            val = vol['dev']
1589
          elif field == "vg":
1590
            val = vol['vg']
1591
          elif field == "name":
1592
            val = vol['name']
1593
          elif field == "size":
1594
            val = int(float(vol['size']))
1595
          elif field == "instance":
1596
            for inst in ilist:
1597
              if node not in lv_by_node[inst]:
1598
                continue
1599
              if vol['name'] in lv_by_node[inst][node]:
1600
                val = inst.name
1601
                break
1602
            else:
1603
              val = '-'
1604
          else:
1605
            raise errors.ParameterError(field)
1606
          node_output.append(str(val))
1607

    
1608
        output.append(node_output)
1609

    
1610
    return output
1611

    
1612

    
1613
class LUAddNode(LogicalUnit):
1614
  """Logical unit for adding node to the cluster.
1615

1616
  """
1617
  HPATH = "node-add"
1618
  HTYPE = constants.HTYPE_NODE
1619
  _OP_REQP = ["node_name"]
1620

    
1621
  def BuildHooksEnv(self):
1622
    """Build hooks env.
1623

1624
    This will run on all nodes before, and on all nodes + the new node after.
1625

1626
    """
1627
    env = {
1628
      "OP_TARGET": self.op.node_name,
1629
      "NODE_NAME": self.op.node_name,
1630
      "NODE_PIP": self.op.primary_ip,
1631
      "NODE_SIP": self.op.secondary_ip,
1632
      }
1633
    nodes_0 = self.cfg.GetNodeList()
1634
    nodes_1 = nodes_0 + [self.op.node_name, ]
1635
    return env, nodes_0, nodes_1
1636

    
1637
  def CheckPrereq(self):
1638
    """Check prerequisites.
1639

1640
    This checks:
1641
     - the new node is not already in the config
1642
     - it is resolvable
1643
     - its parameters (single/dual homed) matches the cluster
1644

1645
    Any errors are signalled by raising errors.OpPrereqError.
1646

1647
    """
1648
    node_name = self.op.node_name
1649
    cfg = self.cfg
1650

    
1651
    dns_data = utils.HostInfo(node_name)
1652

    
1653
    node = dns_data.name
1654
    primary_ip = self.op.primary_ip = dns_data.ip
1655
    secondary_ip = getattr(self.op, "secondary_ip", None)
1656
    if secondary_ip is None:
1657
      secondary_ip = primary_ip
1658
    if not utils.IsValidIP(secondary_ip):
1659
      raise errors.OpPrereqError("Invalid secondary IP given")
1660
    self.op.secondary_ip = secondary_ip
1661

    
1662
    node_list = cfg.GetNodeList()
1663
    if not self.op.readd and node in node_list:
1664
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1665
                                 node)
1666
    elif self.op.readd and node not in node_list:
1667
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1668

    
1669
    for existing_node_name in node_list:
1670
      existing_node = cfg.GetNodeInfo(existing_node_name)
1671

    
1672
      if self.op.readd and node == existing_node_name:
1673
        if (existing_node.primary_ip != primary_ip or
1674
            existing_node.secondary_ip != secondary_ip):
1675
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1676
                                     " address configuration as before")
1677
        continue
1678

    
1679
      if (existing_node.primary_ip == primary_ip or
1680
          existing_node.secondary_ip == primary_ip or
1681
          existing_node.primary_ip == secondary_ip or
1682
          existing_node.secondary_ip == secondary_ip):
1683
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1684
                                   " existing node %s" % existing_node.name)
1685

    
1686
    # check that the type of the node (single versus dual homed) is the
1687
    # same as for the master
1688
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1689
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1690
    newbie_singlehomed = secondary_ip == primary_ip
1691
    if master_singlehomed != newbie_singlehomed:
1692
      if master_singlehomed:
1693
        raise errors.OpPrereqError("The master has no private ip but the"
1694
                                   " new node has one")
1695
      else:
1696
        raise errors.OpPrereqError("The master has a private ip but the"
1697
                                   " new node doesn't have one")
1698

    
1699
    # checks reachablity
1700
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1701
      raise errors.OpPrereqError("Node not reachable by ping")
1702

    
1703
    if not newbie_singlehomed:
1704
      # check reachability from my secondary ip to newbie's secondary ip
1705
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1706
                           source=myself.secondary_ip):
1707
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1708
                                   " based ping to noded port")
1709

    
1710
    self.new_node = objects.Node(name=node,
1711
                                 primary_ip=primary_ip,
1712
                                 secondary_ip=secondary_ip)
1713

    
1714
  def Exec(self, feedback_fn):
1715
    """Adds the new node to the cluster.
1716

1717
    """
1718
    new_node = self.new_node
1719
    node = new_node.name
1720

    
1721
    # check connectivity
1722
    result = rpc.call_version([node])[node]
1723
    if result:
1724
      if constants.PROTOCOL_VERSION == result:
1725
        logger.Info("communication to node %s fine, sw version %s match" %
1726
                    (node, result))
1727
      else:
1728
        raise errors.OpExecError("Version mismatch master version %s,"
1729
                                 " node version %s" %
1730
                                 (constants.PROTOCOL_VERSION, result))
1731
    else:
1732
      raise errors.OpExecError("Cannot get version from the new node")
1733

    
1734
    # setup ssh on node
1735
    logger.Info("copy ssh key to node %s" % node)
1736
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1737
    keyarray = []
1738
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1739
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1740
                priv_key, pub_key]
1741

    
1742
    for i in keyfiles:
1743
      f = open(i, 'r')
1744
      try:
1745
        keyarray.append(f.read())
1746
      finally:
1747
        f.close()
1748

    
1749
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1750
                               keyarray[3], keyarray[4], keyarray[5])
1751

    
1752
    if not result:
1753
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1754

    
1755
    # Add node to our /etc/hosts, and add key to known_hosts
1756
    utils.AddHostToEtcHosts(new_node.name)
1757

    
1758
    if new_node.secondary_ip != new_node.primary_ip:
1759
      if not rpc.call_node_tcp_ping(new_node.name,
1760
                                    constants.LOCALHOST_IP_ADDRESS,
1761
                                    new_node.secondary_ip,
1762
                                    constants.DEFAULT_NODED_PORT,
1763
                                    10, False):
1764
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1765
                                 " you gave (%s). Please fix and re-run this"
1766
                                 " command." % new_node.secondary_ip)
1767

    
1768
    node_verify_list = [self.sstore.GetMasterNode()]
1769
    node_verify_param = {
1770
      'nodelist': [node],
1771
      # TODO: do a node-net-test as well?
1772
    }
1773

    
1774
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1775
    for verifier in node_verify_list:
1776
      if not result[verifier]:
1777
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1778
                                 " for remote verification" % verifier)
1779
      if result[verifier]['nodelist']:
1780
        for failed in result[verifier]['nodelist']:
1781
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1782
                      (verifier, result[verifier]['nodelist'][failed]))
1783
        raise errors.OpExecError("ssh/hostname verification failed.")
1784

    
1785
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1786
    # including the node just added
1787
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1788
    dist_nodes = self.cfg.GetNodeList()
1789
    if not self.op.readd:
1790
      dist_nodes.append(node)
1791
    if myself.name in dist_nodes:
1792
      dist_nodes.remove(myself.name)
1793

    
1794
    logger.Debug("Copying hosts and known_hosts to all nodes")
1795
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1796
      result = rpc.call_upload_file(dist_nodes, fname)
1797
      for to_node in dist_nodes:
1798
        if not result[to_node]:
1799
          logger.Error("copy of file %s to node %s failed" %
1800
                       (fname, to_node))
1801

    
1802
    to_copy = self.sstore.GetFileList()
1803
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1804
      to_copy.append(constants.VNC_PASSWORD_FILE)
1805
    for fname in to_copy:
1806
      result = rpc.call_upload_file([node], fname)
1807
      if not result[node]:
1808
        logger.Error("could not copy file %s to node %s" % (fname, node))
1809

    
1810
    if self.op.readd:
1811
      self.context.ReaddNode(new_node)
1812
    else:
1813
      self.context.AddNode(new_node)
1814

    
1815

    
1816
class LUQueryClusterInfo(NoHooksLU):
1817
  """Query cluster configuration.
1818

1819
  """
1820
  _OP_REQP = []
1821
  REQ_MASTER = False
1822
  REQ_BGL = False
1823

    
1824
  def ExpandNames(self):
1825
    self.needed_locks = {}
1826

    
1827
  def CheckPrereq(self):
1828
    """No prerequsites needed for this LU.
1829

1830
    """
1831
    pass
1832

    
1833
  def Exec(self, feedback_fn):
1834
    """Return cluster config.
1835

1836
    """
1837
    result = {
1838
      "name": self.sstore.GetClusterName(),
1839
      "software_version": constants.RELEASE_VERSION,
1840
      "protocol_version": constants.PROTOCOL_VERSION,
1841
      "config_version": constants.CONFIG_VERSION,
1842
      "os_api_version": constants.OS_API_VERSION,
1843
      "export_version": constants.EXPORT_VERSION,
1844
      "master": self.sstore.GetMasterNode(),
1845
      "architecture": (platform.architecture()[0], platform.machine()),
1846
      "hypervisor_type": self.sstore.GetHypervisorType(),
1847
      }
1848

    
1849
    return result
1850

    
1851

    
1852
class LUDumpClusterConfig(NoHooksLU):
1853
  """Return a text-representation of the cluster-config.
1854

1855
  """
1856
  _OP_REQP = []
1857
  REQ_BGL = False
1858

    
1859
  def ExpandNames(self):
1860
    self.needed_locks = {}
1861

    
1862
  def CheckPrereq(self):
1863
    """No prerequisites.
1864

1865
    """
1866
    pass
1867

    
1868
  def Exec(self, feedback_fn):
1869
    """Dump a representation of the cluster config to the standard output.
1870

1871
    """
1872
    return self.cfg.DumpConfig()
1873

    
1874

    
1875
class LUActivateInstanceDisks(NoHooksLU):
1876
  """Bring up an instance's disks.
1877

1878
  """
1879
  _OP_REQP = ["instance_name"]
1880
  REQ_BGL = False
1881

    
1882
  def ExpandNames(self):
1883
    self._ExpandAndLockInstance()
1884
    self.needed_locks[locking.LEVEL_NODE] = []
1885
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1886

    
1887
  def DeclareLocks(self, level):
1888
    if level == locking.LEVEL_NODE:
1889
      self._LockInstancesNodes()
1890

    
1891
  def CheckPrereq(self):
1892
    """Check prerequisites.
1893

1894
    This checks that the instance is in the cluster.
1895

1896
    """
1897
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1898
    assert self.instance is not None, \
1899
      "Cannot retrieve locked instance %s" % self.op.instance_name
1900

    
1901
  def Exec(self, feedback_fn):
1902
    """Activate the disks.
1903

1904
    """
1905
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1906
    if not disks_ok:
1907
      raise errors.OpExecError("Cannot activate block devices")
1908

    
1909
    return disks_info
1910

    
1911

    
1912
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1913
  """Prepare the block devices for an instance.
1914

1915
  This sets up the block devices on all nodes.
1916

1917
  Args:
1918
    instance: a ganeti.objects.Instance object
1919
    ignore_secondaries: if true, errors on secondary nodes won't result
1920
                        in an error return from the function
1921

1922
  Returns:
1923
    false if the operation failed
1924
    list of (host, instance_visible_name, node_visible_name) if the operation
1925
         suceeded with the mapping from node devices to instance devices
1926
  """
1927
  device_info = []
1928
  disks_ok = True
1929
  iname = instance.name
1930
  # With the two passes mechanism we try to reduce the window of
1931
  # opportunity for the race condition of switching DRBD to primary
1932
  # before handshaking occured, but we do not eliminate it
1933

    
1934
  # The proper fix would be to wait (with some limits) until the
1935
  # connection has been made and drbd transitions from WFConnection
1936
  # into any other network-connected state (Connected, SyncTarget,
1937
  # SyncSource, etc.)
1938

    
1939
  # 1st pass, assemble on all nodes in secondary mode
1940
  for inst_disk in instance.disks:
1941
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1942
      cfg.SetDiskID(node_disk, node)
1943
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1944
      if not result:
1945
        logger.Error("could not prepare block device %s on node %s"
1946
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1947
        if not ignore_secondaries:
1948
          disks_ok = False
1949

    
1950
  # FIXME: race condition on drbd migration to primary
1951

    
1952
  # 2nd pass, do only the primary node
1953
  for inst_disk in instance.disks:
1954
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1955
      if node != instance.primary_node:
1956
        continue
1957
      cfg.SetDiskID(node_disk, node)
1958
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1959
      if not result:
1960
        logger.Error("could not prepare block device %s on node %s"
1961
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1962
        disks_ok = False
1963
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1964

    
1965
  # leave the disks configured for the primary node
1966
  # this is a workaround that would be fixed better by
1967
  # improving the logical/physical id handling
1968
  for disk in instance.disks:
1969
    cfg.SetDiskID(disk, instance.primary_node)
1970

    
1971
  return disks_ok, device_info
1972

    
1973

    
1974
def _StartInstanceDisks(cfg, instance, force):
1975
  """Start the disks of an instance.
1976

1977
  """
1978
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1979
                                           ignore_secondaries=force)
1980
  if not disks_ok:
1981
    _ShutdownInstanceDisks(instance, cfg)
1982
    if force is not None and not force:
1983
      logger.Error("If the message above refers to a secondary node,"
1984
                   " you can retry the operation using '--force'.")
1985
    raise errors.OpExecError("Disk consistency error")
1986

    
1987

    
1988
class LUDeactivateInstanceDisks(NoHooksLU):
1989
  """Shutdown an instance's disks.
1990

1991
  """
1992
  _OP_REQP = ["instance_name"]
1993
  REQ_BGL = False
1994

    
1995
  def ExpandNames(self):
1996
    self._ExpandAndLockInstance()
1997
    self.needed_locks[locking.LEVEL_NODE] = []
1998
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1999

    
2000
  def DeclareLocks(self, level):
2001
    if level == locking.LEVEL_NODE:
2002
      self._LockInstancesNodes()
2003

    
2004
  def CheckPrereq(self):
2005
    """Check prerequisites.
2006

2007
    This checks that the instance is in the cluster.
2008

2009
    """
2010
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2011
    assert self.instance is not None, \
2012
      "Cannot retrieve locked instance %s" % self.op.instance_name
2013

    
2014
  def Exec(self, feedback_fn):
2015
    """Deactivate the disks
2016

2017
    """
2018
    instance = self.instance
2019
    _SafeShutdownInstanceDisks(instance, self.cfg)
2020

    
2021

    
2022
def _SafeShutdownInstanceDisks(instance, cfg):
2023
  """Shutdown block devices of an instance.
2024

2025
  This function checks if an instance is running, before calling
2026
  _ShutdownInstanceDisks.
2027

2028
  """
2029
  ins_l = rpc.call_instance_list([instance.primary_node])
2030
  ins_l = ins_l[instance.primary_node]
2031
  if not type(ins_l) is list:
2032
    raise errors.OpExecError("Can't contact node '%s'" %
2033
                             instance.primary_node)
2034

    
2035
  if instance.name in ins_l:
2036
    raise errors.OpExecError("Instance is running, can't shutdown"
2037
                             " block devices.")
2038

    
2039
  _ShutdownInstanceDisks(instance, cfg)
2040

    
2041

    
2042
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2043
  """Shutdown block devices of an instance.
2044

2045
  This does the shutdown on all nodes of the instance.
2046

2047
  If the ignore_primary is false, errors on the primary node are
2048
  ignored.
2049

2050
  """
2051
  result = True
2052
  for disk in instance.disks:
2053
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2054
      cfg.SetDiskID(top_disk, node)
2055
      if not rpc.call_blockdev_shutdown(node, top_disk):
2056
        logger.Error("could not shutdown block device %s on node %s" %
2057
                     (disk.iv_name, node))
2058
        if not ignore_primary or node != instance.primary_node:
2059
          result = False
2060
  return result
2061

    
2062

    
2063
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2064
  """Checks if a node has enough free memory.
2065

2066
  This function check if a given node has the needed amount of free
2067
  memory. In case the node has less memory or we cannot get the
2068
  information from the node, this function raise an OpPrereqError
2069
  exception.
2070

2071
  Args:
2072
    - cfg: a ConfigWriter instance
2073
    - node: the node name
2074
    - reason: string to use in the error message
2075
    - requested: the amount of memory in MiB
2076

2077
  """
2078
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2079
  if not nodeinfo or not isinstance(nodeinfo, dict):
2080
    raise errors.OpPrereqError("Could not contact node %s for resource"
2081
                             " information" % (node,))
2082

    
2083
  free_mem = nodeinfo[node].get('memory_free')
2084
  if not isinstance(free_mem, int):
2085
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2086
                             " was '%s'" % (node, free_mem))
2087
  if requested > free_mem:
2088
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2089
                             " needed %s MiB, available %s MiB" %
2090
                             (node, reason, requested, free_mem))
2091

    
2092

    
2093
class LUStartupInstance(LogicalUnit):
2094
  """Starts an instance.
2095

2096
  """
2097
  HPATH = "instance-start"
2098
  HTYPE = constants.HTYPE_INSTANCE
2099
  _OP_REQP = ["instance_name", "force"]
2100
  REQ_BGL = False
2101

    
2102
  def ExpandNames(self):
2103
    self._ExpandAndLockInstance()
2104
    self.needed_locks[locking.LEVEL_NODE] = []
2105
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2106

    
2107
  def DeclareLocks(self, level):
2108
    if level == locking.LEVEL_NODE:
2109
      self._LockInstancesNodes()
2110

    
2111
  def BuildHooksEnv(self):
2112
    """Build hooks env.
2113

2114
    This runs on master, primary and secondary nodes of the instance.
2115

2116
    """
2117
    env = {
2118
      "FORCE": self.op.force,
2119
      }
2120
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2121
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2122
          list(self.instance.secondary_nodes))
2123
    return env, nl, nl
2124

    
2125
  def CheckPrereq(self):
2126
    """Check prerequisites.
2127

2128
    This checks that the instance is in the cluster.
2129

2130
    """
2131
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2132
    assert self.instance is not None, \
2133
      "Cannot retrieve locked instance %s" % self.op.instance_name
2134

    
2135
    # check bridges existance
2136
    _CheckInstanceBridgesExist(instance)
2137

    
2138
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2139
                         "starting instance %s" % instance.name,
2140
                         instance.memory)
2141

    
2142
  def Exec(self, feedback_fn):
2143
    """Start the instance.
2144

2145
    """
2146
    instance = self.instance
2147
    force = self.op.force
2148
    extra_args = getattr(self.op, "extra_args", "")
2149

    
2150
    self.cfg.MarkInstanceUp(instance.name)
2151

    
2152
    node_current = instance.primary_node
2153

    
2154
    _StartInstanceDisks(self.cfg, instance, force)
2155

    
2156
    if not rpc.call_instance_start(node_current, instance, extra_args):
2157
      _ShutdownInstanceDisks(instance, self.cfg)
2158
      raise errors.OpExecError("Could not start instance")
2159

    
2160

    
2161
class LURebootInstance(LogicalUnit):
2162
  """Reboot an instance.
2163

2164
  """
2165
  HPATH = "instance-reboot"
2166
  HTYPE = constants.HTYPE_INSTANCE
2167
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2168
  REQ_BGL = False
2169

    
2170
  def ExpandNames(self):
2171
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2172
                                   constants.INSTANCE_REBOOT_HARD,
2173
                                   constants.INSTANCE_REBOOT_FULL]:
2174
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2175
                                  (constants.INSTANCE_REBOOT_SOFT,
2176
                                   constants.INSTANCE_REBOOT_HARD,
2177
                                   constants.INSTANCE_REBOOT_FULL))
2178
    self._ExpandAndLockInstance()
2179
    self.needed_locks[locking.LEVEL_NODE] = []
2180
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2181

    
2182
  def DeclareLocks(self, level):
2183
    if level == locking.LEVEL_NODE:
2184
      primary_only = not constants.INSTANCE_REBOOT_FULL
2185
      self._LockInstancesNodes(primary_only=primary_only)
2186

    
2187
  def BuildHooksEnv(self):
2188
    """Build hooks env.
2189

2190
    This runs on master, primary and secondary nodes of the instance.
2191

2192
    """
2193
    env = {
2194
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2195
      }
2196
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2197
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2198
          list(self.instance.secondary_nodes))
2199
    return env, nl, nl
2200

    
2201
  def CheckPrereq(self):
2202
    """Check prerequisites.
2203

2204
    This checks that the instance is in the cluster.
2205

2206
    """
2207
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2208
    assert self.instance is not None, \
2209
      "Cannot retrieve locked instance %s" % self.op.instance_name
2210

    
2211
    # check bridges existance
2212
    _CheckInstanceBridgesExist(instance)
2213

    
2214
  def Exec(self, feedback_fn):
2215
    """Reboot the instance.
2216

2217
    """
2218
    instance = self.instance
2219
    ignore_secondaries = self.op.ignore_secondaries
2220
    reboot_type = self.op.reboot_type
2221
    extra_args = getattr(self.op, "extra_args", "")
2222

    
2223
    node_current = instance.primary_node
2224

    
2225
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2226
                       constants.INSTANCE_REBOOT_HARD]:
2227
      if not rpc.call_instance_reboot(node_current, instance,
2228
                                      reboot_type, extra_args):
2229
        raise errors.OpExecError("Could not reboot instance")
2230
    else:
2231
      if not rpc.call_instance_shutdown(node_current, instance):
2232
        raise errors.OpExecError("could not shutdown instance for full reboot")
2233
      _ShutdownInstanceDisks(instance, self.cfg)
2234
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2235
      if not rpc.call_instance_start(node_current, instance, extra_args):
2236
        _ShutdownInstanceDisks(instance, self.cfg)
2237
        raise errors.OpExecError("Could not start instance for full reboot")
2238

    
2239
    self.cfg.MarkInstanceUp(instance.name)
2240

    
2241

    
2242
class LUShutdownInstance(LogicalUnit):
2243
  """Shutdown an instance.
2244

2245
  """
2246
  HPATH = "instance-stop"
2247
  HTYPE = constants.HTYPE_INSTANCE
2248
  _OP_REQP = ["instance_name"]
2249
  REQ_BGL = False
2250

    
2251
  def ExpandNames(self):
2252
    self._ExpandAndLockInstance()
2253
    self.needed_locks[locking.LEVEL_NODE] = []
2254
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2255

    
2256
  def DeclareLocks(self, level):
2257
    if level == locking.LEVEL_NODE:
2258
      self._LockInstancesNodes()
2259

    
2260
  def BuildHooksEnv(self):
2261
    """Build hooks env.
2262

2263
    This runs on master, primary and secondary nodes of the instance.
2264

2265
    """
2266
    env = _BuildInstanceHookEnvByObject(self.instance)
2267
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2268
          list(self.instance.secondary_nodes))
2269
    return env, nl, nl
2270

    
2271
  def CheckPrereq(self):
2272
    """Check prerequisites.
2273

2274
    This checks that the instance is in the cluster.
2275

2276
    """
2277
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2278
    assert self.instance is not None, \
2279
      "Cannot retrieve locked instance %s" % self.op.instance_name
2280

    
2281
  def Exec(self, feedback_fn):
2282
    """Shutdown the instance.
2283

2284
    """
2285
    instance = self.instance
2286
    node_current = instance.primary_node
2287
    self.cfg.MarkInstanceDown(instance.name)
2288
    if not rpc.call_instance_shutdown(node_current, instance):
2289
      logger.Error("could not shutdown instance")
2290

    
2291
    _ShutdownInstanceDisks(instance, self.cfg)
2292

    
2293

    
2294
class LUReinstallInstance(LogicalUnit):
2295
  """Reinstall an instance.
2296

2297
  """
2298
  HPATH = "instance-reinstall"
2299
  HTYPE = constants.HTYPE_INSTANCE
2300
  _OP_REQP = ["instance_name"]
2301
  REQ_BGL = False
2302

    
2303
  def ExpandNames(self):
2304
    self._ExpandAndLockInstance()
2305
    self.needed_locks[locking.LEVEL_NODE] = []
2306
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2307

    
2308
  def DeclareLocks(self, level):
2309
    if level == locking.LEVEL_NODE:
2310
      self._LockInstancesNodes()
2311

    
2312
  def BuildHooksEnv(self):
2313
    """Build hooks env.
2314

2315
    This runs on master, primary and secondary nodes of the instance.
2316

2317
    """
2318
    env = _BuildInstanceHookEnvByObject(self.instance)
2319
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2320
          list(self.instance.secondary_nodes))
2321
    return env, nl, nl
2322

    
2323
  def CheckPrereq(self):
2324
    """Check prerequisites.
2325

2326
    This checks that the instance is in the cluster and is not running.
2327

2328
    """
2329
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2330
    assert instance is not None, \
2331
      "Cannot retrieve locked instance %s" % self.op.instance_name
2332

    
2333
    if instance.disk_template == constants.DT_DISKLESS:
2334
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2335
                                 self.op.instance_name)
2336
    if instance.status != "down":
2337
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2338
                                 self.op.instance_name)
2339
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2340
    if remote_info:
2341
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2342
                                 (self.op.instance_name,
2343
                                  instance.primary_node))
2344

    
2345
    self.op.os_type = getattr(self.op, "os_type", None)
2346
    if self.op.os_type is not None:
2347
      # OS verification
2348
      pnode = self.cfg.GetNodeInfo(
2349
        self.cfg.ExpandNodeName(instance.primary_node))
2350
      if pnode is None:
2351
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2352
                                   self.op.pnode)
2353
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2354
      if not os_obj:
2355
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2356
                                   " primary node"  % self.op.os_type)
2357

    
2358
    self.instance = instance
2359

    
2360
  def Exec(self, feedback_fn):
2361
    """Reinstall the instance.
2362

2363
    """
2364
    inst = self.instance
2365

    
2366
    if self.op.os_type is not None:
2367
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2368
      inst.os = self.op.os_type
2369
      self.cfg.AddInstance(inst)
2370

    
2371
    _StartInstanceDisks(self.cfg, inst, None)
2372
    try:
2373
      feedback_fn("Running the instance OS create scripts...")
2374
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2375
        raise errors.OpExecError("Could not install OS for instance %s"
2376
                                 " on node %s" %
2377
                                 (inst.name, inst.primary_node))
2378
    finally:
2379
      _ShutdownInstanceDisks(inst, self.cfg)
2380

    
2381

    
2382
class LURenameInstance(LogicalUnit):
2383
  """Rename an instance.
2384

2385
  """
2386
  HPATH = "instance-rename"
2387
  HTYPE = constants.HTYPE_INSTANCE
2388
  _OP_REQP = ["instance_name", "new_name"]
2389

    
2390
  def BuildHooksEnv(self):
2391
    """Build hooks env.
2392

2393
    This runs on master, primary and secondary nodes of the instance.
2394

2395
    """
2396
    env = _BuildInstanceHookEnvByObject(self.instance)
2397
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2398
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2399
          list(self.instance.secondary_nodes))
2400
    return env, nl, nl
2401

    
2402
  def CheckPrereq(self):
2403
    """Check prerequisites.
2404

2405
    This checks that the instance is in the cluster and is not running.
2406

2407
    """
2408
    instance = self.cfg.GetInstanceInfo(
2409
      self.cfg.ExpandInstanceName(self.op.instance_name))
2410
    if instance is None:
2411
      raise errors.OpPrereqError("Instance '%s' not known" %
2412
                                 self.op.instance_name)
2413
    if instance.status != "down":
2414
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2415
                                 self.op.instance_name)
2416
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2417
    if remote_info:
2418
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2419
                                 (self.op.instance_name,
2420
                                  instance.primary_node))
2421
    self.instance = instance
2422

    
2423
    # new name verification
2424
    name_info = utils.HostInfo(self.op.new_name)
2425

    
2426
    self.op.new_name = new_name = name_info.name
2427
    instance_list = self.cfg.GetInstanceList()
2428
    if new_name in instance_list:
2429
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2430
                                 new_name)
2431

    
2432
    if not getattr(self.op, "ignore_ip", False):
2433
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2434
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2435
                                   (name_info.ip, new_name))
2436

    
2437

    
2438
  def Exec(self, feedback_fn):
2439
    """Reinstall the instance.
2440

2441
    """
2442
    inst = self.instance
2443
    old_name = inst.name
2444

    
2445
    if inst.disk_template == constants.DT_FILE:
2446
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2447

    
2448
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2449
    # Change the instance lock. This is definitely safe while we hold the BGL
2450
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2451
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2452

    
2453
    # re-read the instance from the configuration after rename
2454
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2455

    
2456
    if inst.disk_template == constants.DT_FILE:
2457
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2458
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2459
                                                old_file_storage_dir,
2460
                                                new_file_storage_dir)
2461

    
2462
      if not result:
2463
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2464
                                 " directory '%s' to '%s' (but the instance"
2465
                                 " has been renamed in Ganeti)" % (
2466
                                 inst.primary_node, old_file_storage_dir,
2467
                                 new_file_storage_dir))
2468

    
2469
      if not result[0]:
2470
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2471
                                 " (but the instance has been renamed in"
2472
                                 " Ganeti)" % (old_file_storage_dir,
2473
                                               new_file_storage_dir))
2474

    
2475
    _StartInstanceDisks(self.cfg, inst, None)
2476
    try:
2477
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2478
                                          "sda", "sdb"):
2479
        msg = ("Could not run OS rename script for instance %s on node %s"
2480
               " (but the instance has been renamed in Ganeti)" %
2481
               (inst.name, inst.primary_node))
2482
        logger.Error(msg)
2483
    finally:
2484
      _ShutdownInstanceDisks(inst, self.cfg)
2485

    
2486

    
2487
class LURemoveInstance(LogicalUnit):
2488
  """Remove an instance.
2489

2490
  """
2491
  HPATH = "instance-remove"
2492
  HTYPE = constants.HTYPE_INSTANCE
2493
  _OP_REQP = ["instance_name", "ignore_failures"]
2494
  REQ_BGL = False
2495

    
2496
  def ExpandNames(self):
2497
    self._ExpandAndLockInstance()
2498
    self.needed_locks[locking.LEVEL_NODE] = []
2499
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2500

    
2501
  def DeclareLocks(self, level):
2502
    if level == locking.LEVEL_NODE:
2503
      self._LockInstancesNodes()
2504

    
2505
  def BuildHooksEnv(self):
2506
    """Build hooks env.
2507

2508
    This runs on master, primary and secondary nodes of the instance.
2509

2510
    """
2511
    env = _BuildInstanceHookEnvByObject(self.instance)
2512
    nl = [self.sstore.GetMasterNode()]
2513
    return env, nl, nl
2514

    
2515
  def CheckPrereq(self):
2516
    """Check prerequisites.
2517

2518
    This checks that the instance is in the cluster.
2519

2520
    """
2521
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2522
    assert self.instance is not None, \
2523
      "Cannot retrieve locked instance %s" % self.op.instance_name
2524

    
2525
  def Exec(self, feedback_fn):
2526
    """Remove the instance.
2527

2528
    """
2529
    instance = self.instance
2530
    logger.Info("shutting down instance %s on node %s" %
2531
                (instance.name, instance.primary_node))
2532

    
2533
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2534
      if self.op.ignore_failures:
2535
        feedback_fn("Warning: can't shutdown instance")
2536
      else:
2537
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2538
                                 (instance.name, instance.primary_node))
2539

    
2540
    logger.Info("removing block devices for instance %s" % instance.name)
2541

    
2542
    if not _RemoveDisks(instance, self.cfg):
2543
      if self.op.ignore_failures:
2544
        feedback_fn("Warning: can't remove instance's disks")
2545
      else:
2546
        raise errors.OpExecError("Can't remove instance's disks")
2547

    
2548
    logger.Info("removing instance %s out of cluster config" % instance.name)
2549

    
2550
    self.cfg.RemoveInstance(instance.name)
2551
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2552

    
2553

    
2554
class LUQueryInstances(NoHooksLU):
2555
  """Logical unit for querying instances.
2556

2557
  """
2558
  _OP_REQP = ["output_fields", "names"]
2559
  REQ_BGL = False
2560

    
2561
  def ExpandNames(self):
2562
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2563
    self.static_fields = frozenset([
2564
      "name", "os", "pnode", "snodes",
2565
      "admin_state", "admin_ram",
2566
      "disk_template", "ip", "mac", "bridge",
2567
      "sda_size", "sdb_size", "vcpus", "tags",
2568
      "network_port", "kernel_path", "initrd_path",
2569
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2570
      "hvm_cdrom_image_path", "hvm_nic_type",
2571
      "hvm_disk_type", "vnc_bind_address",
2572
      ])
2573
    _CheckOutputFields(static=self.static_fields,
2574
                       dynamic=self.dynamic_fields,
2575
                       selected=self.op.output_fields)
2576

    
2577
    self.needed_locks = {}
2578
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2579
    self.share_locks[locking.LEVEL_NODE] = 1
2580

    
2581
    if self.op.names:
2582
      self.wanted = _GetWantedInstances(self, self.op.names)
2583
    else:
2584
      self.wanted = locking.ALL_SET
2585

    
2586
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2587
    if self.do_locking:
2588
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2589
      self.needed_locks[locking.LEVEL_NODE] = []
2590
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2591

    
2592
  def DeclareLocks(self, level):
2593
    if level == locking.LEVEL_NODE and self.do_locking:
2594
      self._LockInstancesNodes()
2595

    
2596
  def CheckPrereq(self):
2597
    """Check prerequisites.
2598

2599
    """
2600
    pass
2601

    
2602
  def Exec(self, feedback_fn):
2603
    """Computes the list of nodes and their attributes.
2604

2605
    """
2606
    all_info = self.cfg.GetAllInstancesInfo()
2607
    if self.do_locking:
2608
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2609
    else:
2610
      instance_names = all_info.keys()
2611
    instance_list = [all_info[iname] for iname in instance_names]
2612

    
2613
    # begin data gathering
2614

    
2615
    nodes = frozenset([inst.primary_node for inst in instance_list])
2616

    
2617
    bad_nodes = []
2618
    if self.dynamic_fields.intersection(self.op.output_fields):
2619
      live_data = {}
2620
      node_data = rpc.call_all_instances_info(nodes)
2621
      for name in nodes:
2622
        result = node_data[name]
2623
        if result:
2624
          live_data.update(result)
2625
        elif result == False:
2626
          bad_nodes.append(name)
2627
        # else no instance is alive
2628
    else:
2629
      live_data = dict([(name, {}) for name in instance_names])
2630

    
2631
    # end data gathering
2632

    
2633
    output = []
2634
    for instance in instance_list:
2635
      iout = []
2636
      for field in self.op.output_fields:
2637
        if field == "name":
2638
          val = instance.name
2639
        elif field == "os":
2640
          val = instance.os
2641
        elif field == "pnode":
2642
          val = instance.primary_node
2643
        elif field == "snodes":
2644
          val = list(instance.secondary_nodes)
2645
        elif field == "admin_state":
2646
          val = (instance.status != "down")
2647
        elif field == "oper_state":
2648
          if instance.primary_node in bad_nodes:
2649
            val = None
2650
          else:
2651
            val = bool(live_data.get(instance.name))
2652
        elif field == "status":
2653
          if instance.primary_node in bad_nodes:
2654
            val = "ERROR_nodedown"
2655
          else:
2656
            running = bool(live_data.get(instance.name))
2657
            if running:
2658
              if instance.status != "down":
2659
                val = "running"
2660
              else:
2661
                val = "ERROR_up"
2662
            else:
2663
              if instance.status != "down":
2664
                val = "ERROR_down"
2665
              else:
2666
                val = "ADMIN_down"
2667
        elif field == "admin_ram":
2668
          val = instance.memory
2669
        elif field == "oper_ram":
2670
          if instance.primary_node in bad_nodes:
2671
            val = None
2672
          elif instance.name in live_data:
2673
            val = live_data[instance.name].get("memory", "?")
2674
          else:
2675
            val = "-"
2676
        elif field == "disk_template":
2677
          val = instance.disk_template
2678
        elif field == "ip":
2679
          val = instance.nics[0].ip
2680
        elif field == "bridge":
2681
          val = instance.nics[0].bridge
2682
        elif field == "mac":
2683
          val = instance.nics[0].mac
2684
        elif field == "sda_size" or field == "sdb_size":
2685
          disk = instance.FindDisk(field[:3])
2686
          if disk is None:
2687
            val = None
2688
          else:
2689
            val = disk.size
2690
        elif field == "vcpus":
2691
          val = instance.vcpus
2692
        elif field == "tags":
2693
          val = list(instance.GetTags())
2694
        elif field in ("network_port", "kernel_path", "initrd_path",
2695
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2696
                       "hvm_cdrom_image_path", "hvm_nic_type",
2697
                       "hvm_disk_type", "vnc_bind_address"):
2698
          val = getattr(instance, field, None)
2699
          if val is not None:
2700
            pass
2701
          elif field in ("hvm_nic_type", "hvm_disk_type",
2702
                         "kernel_path", "initrd_path"):
2703
            val = "default"
2704
          else:
2705
            val = "-"
2706
        else:
2707
          raise errors.ParameterError(field)
2708
        iout.append(val)
2709
      output.append(iout)
2710

    
2711
    return output
2712

    
2713

    
2714
class LUFailoverInstance(LogicalUnit):
2715
  """Failover an instance.
2716

2717
  """
2718
  HPATH = "instance-failover"
2719
  HTYPE = constants.HTYPE_INSTANCE
2720
  _OP_REQP = ["instance_name", "ignore_consistency"]
2721
  REQ_BGL = False
2722

    
2723
  def ExpandNames(self):
2724
    self._ExpandAndLockInstance()
2725
    self.needed_locks[locking.LEVEL_NODE] = []
2726
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2727

    
2728
  def DeclareLocks(self, level):
2729
    if level == locking.LEVEL_NODE:
2730
      self._LockInstancesNodes()
2731

    
2732
  def BuildHooksEnv(self):
2733
    """Build hooks env.
2734

2735
    This runs on master, primary and secondary nodes of the instance.
2736

2737
    """
2738
    env = {
2739
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2740
      }
2741
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2742
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2743
    return env, nl, nl
2744

    
2745
  def CheckPrereq(self):
2746
    """Check prerequisites.
2747

2748
    This checks that the instance is in the cluster.
2749

2750
    """
2751
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2752
    assert self.instance is not None, \
2753
      "Cannot retrieve locked instance %s" % self.op.instance_name
2754

    
2755
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2756
      raise errors.OpPrereqError("Instance's disk layout is not"
2757
                                 " network mirrored, cannot failover.")
2758

    
2759
    secondary_nodes = instance.secondary_nodes
2760
    if not secondary_nodes:
2761
      raise errors.ProgrammerError("no secondary node but using "
2762
                                   "a mirrored disk template")
2763

    
2764
    target_node = secondary_nodes[0]
2765
    # check memory requirements on the secondary node
2766
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2767
                         instance.name, instance.memory)
2768

    
2769
    # check bridge existance
2770
    brlist = [nic.bridge for nic in instance.nics]
2771
    if not rpc.call_bridges_exist(target_node, brlist):
2772
      raise errors.OpPrereqError("One or more target bridges %s does not"
2773
                                 " exist on destination node '%s'" %
2774
                                 (brlist, target_node))
2775

    
2776
  def Exec(self, feedback_fn):
2777
    """Failover an instance.
2778

2779
    The failover is done by shutting it down on its present node and
2780
    starting it on the secondary.
2781

2782
    """
2783
    instance = self.instance
2784

    
2785
    source_node = instance.primary_node
2786
    target_node = instance.secondary_nodes[0]
2787

    
2788
    feedback_fn("* checking disk consistency between source and target")
2789
    for dev in instance.disks:
2790
      # for drbd, these are drbd over lvm
2791
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2792
        if instance.status == "up" and not self.op.ignore_consistency:
2793
          raise errors.OpExecError("Disk %s is degraded on target node,"
2794
                                   " aborting failover." % dev.iv_name)
2795

    
2796
    feedback_fn("* shutting down instance on source node")
2797
    logger.Info("Shutting down instance %s on node %s" %
2798
                (instance.name, source_node))
2799

    
2800
    if not rpc.call_instance_shutdown(source_node, instance):
2801
      if self.op.ignore_consistency:
2802
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2803
                     " anyway. Please make sure node %s is down"  %
2804
                     (instance.name, source_node, source_node))
2805
      else:
2806
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2807
                                 (instance.name, source_node))
2808

    
2809
    feedback_fn("* deactivating the instance's disks on source node")
2810
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2811
      raise errors.OpExecError("Can't shut down the instance's disks.")
2812

    
2813
    instance.primary_node = target_node
2814
    # distribute new instance config to the other nodes
2815
    self.cfg.Update(instance)
2816

    
2817
    # Only start the instance if it's marked as up
2818
    if instance.status == "up":
2819
      feedback_fn("* activating the instance's disks on target node")
2820
      logger.Info("Starting instance %s on node %s" %
2821
                  (instance.name, target_node))
2822

    
2823
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2824
                                               ignore_secondaries=True)
2825
      if not disks_ok:
2826
        _ShutdownInstanceDisks(instance, self.cfg)
2827
        raise errors.OpExecError("Can't activate the instance's disks")
2828

    
2829
      feedback_fn("* starting the instance on the target node")
2830
      if not rpc.call_instance_start(target_node, instance, None):
2831
        _ShutdownInstanceDisks(instance, self.cfg)
2832
        raise errors.OpExecError("Could not start instance %s on node %s." %
2833
                                 (instance.name, target_node))
2834

    
2835

    
2836
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2837
  """Create a tree of block devices on the primary node.
2838

2839
  This always creates all devices.
2840

2841
  """
2842
  if device.children:
2843
    for child in device.children:
2844
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2845
        return False
2846

    
2847
  cfg.SetDiskID(device, node)
2848
  new_id = rpc.call_blockdev_create(node, device, device.size,
2849
                                    instance.name, True, info)
2850
  if not new_id:
2851
    return False
2852
  if device.physical_id is None:
2853
    device.physical_id = new_id
2854
  return True
2855

    
2856

    
2857
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2858
  """Create a tree of block devices on a secondary node.
2859

2860
  If this device type has to be created on secondaries, create it and
2861
  all its children.
2862

2863
  If not, just recurse to children keeping the same 'force' value.
2864

2865
  """
2866
  if device.CreateOnSecondary():
2867
    force = True
2868
  if device.children:
2869
    for child in device.children:
2870
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2871
                                        child, force, info):
2872
        return False
2873

    
2874
  if not force:
2875
    return True
2876
  cfg.SetDiskID(device, node)
2877
  new_id = rpc.call_blockdev_create(node, device, device.size,
2878
                                    instance.name, False, info)
2879
  if not new_id:
2880
    return False
2881
  if device.physical_id is None:
2882
    device.physical_id = new_id
2883
  return True
2884

    
2885

    
2886
def _GenerateUniqueNames(cfg, exts):
2887
  """Generate a suitable LV name.
2888

2889
  This will generate a logical volume name for the given instance.
2890

2891
  """
2892
  results = []
2893
  for val in exts:
2894
    new_id = cfg.GenerateUniqueID()
2895
    results.append("%s%s" % (new_id, val))
2896
  return results
2897

    
2898

    
2899
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2900
  """Generate a drbd8 device complete with its children.
2901

2902
  """
2903
  port = cfg.AllocatePort()
2904
  vgname = cfg.GetVGName()
2905
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2906
                          logical_id=(vgname, names[0]))
2907
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2908
                          logical_id=(vgname, names[1]))
2909
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2910
                          logical_id = (primary, secondary, port),
2911
                          children = [dev_data, dev_meta],
2912
                          iv_name=iv_name)
2913
  return drbd_dev
2914

    
2915

    
2916
def _GenerateDiskTemplate(cfg, template_name,
2917
                          instance_name, primary_node,
2918
                          secondary_nodes, disk_sz, swap_sz,
2919
                          file_storage_dir, file_driver):
2920
  """Generate the entire disk layout for a given template type.
2921

2922
  """
2923
  #TODO: compute space requirements
2924

    
2925
  vgname = cfg.GetVGName()
2926
  if template_name == constants.DT_DISKLESS:
2927
    disks = []
2928
  elif template_name == constants.DT_PLAIN:
2929
    if len(secondary_nodes) != 0:
2930
      raise errors.ProgrammerError("Wrong template configuration")
2931

    
2932
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2933
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2934
                           logical_id=(vgname, names[0]),
2935
                           iv_name = "sda")
2936
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2937
                           logical_id=(vgname, names[1]),
2938
                           iv_name = "sdb")
2939
    disks = [sda_dev, sdb_dev]
2940
  elif template_name == constants.DT_DRBD8:
2941
    if len(secondary_nodes) != 1:
2942
      raise errors.ProgrammerError("Wrong template configuration")
2943
    remote_node = secondary_nodes[0]
2944
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2945
                                       ".sdb_data", ".sdb_meta"])
2946
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2947
                                         disk_sz, names[0:2], "sda")
2948
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2949
                                         swap_sz, names[2:4], "sdb")
2950
    disks = [drbd_sda_dev, drbd_sdb_dev]
2951
  elif template_name == constants.DT_FILE:
2952
    if len(secondary_nodes) != 0:
2953
      raise errors.ProgrammerError("Wrong template configuration")
2954

    
2955
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2956
                                iv_name="sda", logical_id=(file_driver,
2957
                                "%s/sda" % file_storage_dir))
2958
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2959
                                iv_name="sdb", logical_id=(file_driver,
2960
                                "%s/sdb" % file_storage_dir))
2961
    disks = [file_sda_dev, file_sdb_dev]
2962
  else:
2963
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2964
  return disks
2965

    
2966

    
2967
def _GetInstanceInfoText(instance):
2968
  """Compute that text that should be added to the disk's metadata.
2969

2970
  """
2971
  return "originstname+%s" % instance.name
2972

    
2973

    
2974
def _CreateDisks(cfg, instance):
2975
  """Create all disks for an instance.
2976

2977
  This abstracts away some work from AddInstance.
2978

2979
  Args:
2980
    instance: the instance object
2981

2982
  Returns:
2983
    True or False showing the success of the creation process
2984

2985
  """
2986
  info = _GetInstanceInfoText(instance)
2987

    
2988
  if instance.disk_template == constants.DT_FILE:
2989
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2990
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2991
                                              file_storage_dir)
2992

    
2993
    if not result:
2994
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2995
      return False
2996

    
2997
    if not result[0]:
2998
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2999
      return False
3000

    
3001
  for device in instance.disks:
3002
    logger.Info("creating volume %s for instance %s" %
3003
                (device.iv_name, instance.name))
3004
    #HARDCODE
3005
    for secondary_node in instance.secondary_nodes:
3006
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3007
                                        device, False, info):
3008
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3009
                     (device.iv_name, device, secondary_node))
3010
        return False
3011
    #HARDCODE
3012
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3013
                                    instance, device, info):
3014
      logger.Error("failed to create volume %s on primary!" %
3015
                   device.iv_name)
3016
      return False
3017

    
3018
  return True
3019

    
3020

    
3021
def _RemoveDisks(instance, cfg):
3022
  """Remove all disks for an instance.
3023

3024
  This abstracts away some work from `AddInstance()` and
3025
  `RemoveInstance()`. Note that in case some of the devices couldn't
3026
  be removed, the removal will continue with the other ones (compare
3027
  with `_CreateDisks()`).
3028

3029
  Args:
3030
    instance: the instance object
3031

3032
  Returns:
3033
    True or False showing the success of the removal proces
3034

3035
  """
3036
  logger.Info("removing block devices for instance %s" % instance.name)
3037

    
3038
  result = True
3039
  for device in instance.disks:
3040
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3041
      cfg.SetDiskID(disk, node)
3042
      if not rpc.call_blockdev_remove(node, disk):
3043
        logger.Error("could not remove block device %s on node %s,"
3044
                     " continuing anyway" %
3045
                     (device.iv_name, node))
3046
        result = False
3047

    
3048
  if instance.disk_template == constants.DT_FILE:
3049
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3050
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3051
                                            file_storage_dir):
3052
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3053
      result = False
3054

    
3055
  return result
3056

    
3057

    
3058
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3059
  """Compute disk size requirements in the volume group
3060

3061
  This is currently hard-coded for the two-drive layout.
3062

3063
  """
3064
  # Required free disk space as a function of disk and swap space
3065
  req_size_dict = {
3066
    constants.DT_DISKLESS: None,
3067
    constants.DT_PLAIN: disk_size + swap_size,
3068
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3069
    constants.DT_DRBD8: disk_size + swap_size + 256,
3070
    constants.DT_FILE: None,
3071
  }
3072

    
3073
  if disk_template not in req_size_dict:
3074
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3075
                                 " is unknown" %  disk_template)
3076

    
3077
  return req_size_dict[disk_template]
3078

    
3079

    
3080
class LUCreateInstance(LogicalUnit):
3081
  """Create an instance.
3082

3083
  """
3084
  HPATH = "instance-add"
3085
  HTYPE = constants.HTYPE_INSTANCE
3086
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3087
              "disk_template", "swap_size", "mode", "start", "vcpus",
3088
              "wait_for_sync", "ip_check", "mac"]
3089
  REQ_BGL = False
3090

    
3091
  def _ExpandNode(self, node):
3092
    """Expands and checks one node name.
3093

3094
    """
3095
    node_full = self.cfg.ExpandNodeName(node)
3096
    if node_full is None:
3097
      raise errors.OpPrereqError("Unknown node %s" % node)
3098
    return node_full
3099

    
3100
  def ExpandNames(self):
3101
    """ExpandNames for CreateInstance.
3102

3103
    Figure out the right locks for instance creation.
3104

3105
    """
3106
    self.needed_locks = {}
3107

    
3108
    # set optional parameters to none if they don't exist
3109
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3110
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3111
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3112
                 "vnc_bind_address"]:
3113
      if not hasattr(self.op, attr):
3114
        setattr(self.op, attr, None)
3115

    
3116
    # verify creation mode
3117
    if self.op.mode not in (constants.INSTANCE_CREATE,
3118
                            constants.INSTANCE_IMPORT):
3119
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3120
                                 self.op.mode)
3121
    # disk template and mirror node verification
3122
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3123
      raise errors.OpPrereqError("Invalid disk template name")
3124

    
3125
    #### instance parameters check
3126

    
3127
    # instance name verification
3128
    hostname1 = utils.HostInfo(self.op.instance_name)
3129
    self.op.instance_name = instance_name = hostname1.name
3130

    
3131
    # this is just a preventive check, but someone might still add this
3132
    # instance in the meantime, and creation will fail at lock-add time
3133
    if instance_name in self.cfg.GetInstanceList():
3134
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3135
                                 instance_name)
3136

    
3137
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3138

    
3139
    # ip validity checks
3140
    ip = getattr(self.op, "ip", None)
3141
    if ip is None or ip.lower() == "none":
3142
      inst_ip = None
3143
    elif ip.lower() == "auto":
3144
      inst_ip = hostname1.ip
3145
    else:
3146
      if not utils.IsValidIP(ip):
3147
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3148
                                   " like a valid IP" % ip)
3149
      inst_ip = ip
3150
    self.inst_ip = self.op.ip = inst_ip
3151
    # used in CheckPrereq for ip ping check
3152
    self.check_ip = hostname1.ip
3153

    
3154
    # MAC address verification
3155
    if self.op.mac != "auto":
3156
      if not utils.IsValidMac(self.op.mac.lower()):
3157
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3158
                                   self.op.mac)
3159

    
3160
    # boot order verification
3161
    if self.op.hvm_boot_order is not None:
3162
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3163
        raise errors.OpPrereqError("invalid boot order specified,"
3164
                                   " must be one or more of [acdn]")
3165
    # file storage checks
3166
    if (self.op.file_driver and
3167
        not self.op.file_driver in constants.FILE_DRIVER):
3168
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3169
                                 self.op.file_driver)
3170

    
3171
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3172
      raise errors.OpPrereqError("File storage directory path not absolute")
3173

    
3174
    ### Node/iallocator related checks
3175
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3176
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3177
                                 " node must be given")
3178

    
3179
    if self.op.iallocator:
3180
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3181
    else:
3182
      self.op.pnode = self._ExpandNode(self.op.pnode)
3183
      nodelist = [self.op.pnode]
3184
      if self.op.snode is not None:
3185
        self.op.snode = self._ExpandNode(self.op.snode)
3186
        nodelist.append(self.op.snode)
3187
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3188

    
3189
    # in case of import lock the source node too
3190
    if self.op.mode == constants.INSTANCE_IMPORT:
3191
      src_node = getattr(self.op, "src_node", None)
3192
      src_path = getattr(self.op, "src_path", None)
3193

    
3194
      if src_node is None or src_path is None:
3195
        raise errors.OpPrereqError("Importing an instance requires source"
3196
                                   " node and path options")
3197

    
3198
      if not os.path.isabs(src_path):
3199
        raise errors.OpPrereqError("The source path must be absolute")
3200

    
3201
      self.op.src_node = src_node = self._ExpandNode(src_node)
3202
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3203
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3204

    
3205
    else: # INSTANCE_CREATE
3206
      if getattr(self.op, "os_type", None) is None:
3207
        raise errors.OpPrereqError("No guest OS specified")
3208

    
3209
  def _RunAllocator(self):
3210
    """Run the allocator based on input opcode.
3211

3212
    """
3213
    disks = [{"size": self.op.disk_size, "mode": "w"},
3214
             {"size": self.op.swap_size, "mode": "w"}]
3215
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3216
             "bridge": self.op.bridge}]
3217
    ial = IAllocator(self.cfg, self.sstore,
3218
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3219
                     name=self.op.instance_name,
3220
                     disk_template=self.op.disk_template,
3221
                     tags=[],
3222
                     os=self.op.os_type,
3223
                     vcpus=self.op.vcpus,
3224
                     mem_size=self.op.mem_size,
3225
                     disks=disks,
3226
                     nics=nics,
3227
                     )
3228

    
3229
    ial.Run(self.op.iallocator)
3230

    
3231
    if not ial.success:
3232
      raise errors.OpPrereqError("Can't compute nodes using"
3233
                                 " iallocator '%s': %s" % (self.op.iallocator,
3234
                                                           ial.info))
3235
    if len(ial.nodes) != ial.required_nodes:
3236
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3237
                                 " of nodes (%s), required %s" %
3238
                                 (len(ial.nodes), ial.required_nodes))
3239
    self.op.pnode = ial.nodes[0]
3240
    logger.ToStdout("Selected nodes for the instance: %s" %
3241
                    (", ".join(ial.nodes),))
3242
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3243
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3244
    if ial.required_nodes == 2:
3245
      self.op.snode = ial.nodes[1]
3246

    
3247
  def BuildHooksEnv(self):
3248
    """Build hooks env.
3249

3250
    This runs on master, primary and secondary nodes of the instance.
3251

3252
    """
3253
    env = {
3254
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3255
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3256
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3257
      "INSTANCE_ADD_MODE": self.op.mode,
3258
      }
3259
    if self.op.mode == constants.INSTANCE_IMPORT:
3260
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3261
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3262
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3263

    
3264
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3265
      primary_node=self.op.pnode,
3266
      secondary_nodes=self.secondaries,
3267
      status=self.instance_status,
3268
      os_type=self.op.os_type,
3269
      memory=self.op.mem_size,
3270
      vcpus=self.op.vcpus,
3271
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3272
    ))
3273

    
3274
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3275
          self.secondaries)
3276
    return env, nl, nl
3277

    
3278

    
3279
  def CheckPrereq(self):
3280
    """Check prerequisites.
3281

3282
    """
3283
    if (not self.cfg.GetVGName() and
3284
        self.op.disk_template not in constants.DTS_NOT_LVM):
3285
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3286
                                 " instances")
3287

    
3288
    if self.op.mode == constants.INSTANCE_IMPORT:
3289
      src_node = self.op.src_node
3290
      src_path = self.op.src_path
3291

    
3292
      export_info = rpc.call_export_info(src_node, src_path)
3293

    
3294
      if not export_info:
3295
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3296

    
3297
      if not export_info.has_section(constants.INISECT_EXP):
3298
        raise errors.ProgrammerError("Corrupted export config")
3299

    
3300
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3301
      if (int(ei_version) != constants.EXPORT_VERSION):
3302
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3303
                                   (ei_version, constants.EXPORT_VERSION))
3304

    
3305
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3306
        raise errors.OpPrereqError("Can't import instance with more than"
3307
                                   " one data disk")
3308

    
3309
      # FIXME: are the old os-es, disk sizes, etc. useful?
3310
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3311
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3312
                                                         'disk0_dump'))
3313
      self.src_image = diskimage
3314

    
3315
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3316

    
3317
    if self.op.start and not self.op.ip_check:
3318
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3319
                                 " adding an instance in start mode")
3320

    
3321
    if self.op.ip_check:
3322
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3323
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3324
                                   (self.check_ip, instance_name))
3325

    
3326
    # bridge verification
3327
    bridge = getattr(self.op, "bridge", None)
3328
    if bridge is None:
3329
      self.op.bridge = self.cfg.GetDefBridge()
3330
    else:
3331
      self.op.bridge = bridge
3332

    
3333
    #### allocator run
3334

    
3335
    if self.op.iallocator is not None:
3336
      self._RunAllocator()
3337

    
3338
    #### node related checks
3339

    
3340
    # check primary node
3341
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3342
    assert self.pnode is not None, \
3343
      "Cannot retrieve locked node %s" % self.op.pnode
3344
    self.secondaries = []
3345

    
3346
    # mirror node verification
3347
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3348
      if self.op.snode is None:
3349
        raise errors.OpPrereqError("The networked disk templates need"
3350
                                   " a mirror node")
3351
      if self.op.snode == pnode.name:
3352
        raise errors.OpPrereqError("The secondary node cannot be"
3353
                                   " the primary node.")
3354
      self.secondaries.append(self.op.snode)
3355

    
3356
    req_size = _ComputeDiskSize(self.op.disk_template,
3357
                                self.op.disk_size, self.op.swap_size)
3358

    
3359
    # Check lv size requirements
3360
    if req_size is not None:
3361
      nodenames = [pnode.name] + self.secondaries
3362
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3363
      for node in nodenames:
3364
        info = nodeinfo.get(node, None)
3365
        if not info:
3366
          raise errors.OpPrereqError("Cannot get current information"
3367
                                     " from node '%s'" % node)
3368
        vg_free = info.get('vg_free', None)
3369
        if not isinstance(vg_free, int):
3370
          raise errors.OpPrereqError("Can't compute free disk space on"
3371
                                     " node %s" % node)
3372
        if req_size > info['vg_free']:
3373
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3374
                                     " %d MB available, %d MB required" %
3375
                                     (node, info['vg_free'], req_size))
3376

    
3377
    # os verification
3378
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3379
    if not os_obj:
3380
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3381
                                 " primary node"  % self.op.os_type)
3382

    
3383
    if self.op.kernel_path == constants.VALUE_NONE:
3384
      raise errors.OpPrereqError("Can't set instance kernel to none")
3385

    
3386
    # bridge check on primary node
3387
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3388
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3389
                                 " destination node '%s'" %
3390
                                 (self.op.bridge, pnode.name))
3391

    
3392
    # memory check on primary node
3393
    if self.op.start:
3394
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3395
                           "creating instance %s" % self.op.instance_name,
3396
                           self.op.mem_size)
3397

    
3398
    # hvm_cdrom_image_path verification
3399
    if self.op.hvm_cdrom_image_path is not None:
3400
      # FIXME (als): shouldn't these checks happen on the destination node?
3401
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3402
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3403
                                   " be an absolute path or None, not %s" %
3404
                                   self.op.hvm_cdrom_image_path)
3405
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3406
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3407
                                   " regular file or a symlink pointing to"
3408
                                   " an existing regular file, not %s" %
3409
                                   self.op.hvm_cdrom_image_path)
3410

    
3411
    # vnc_bind_address verification
3412
    if self.op.vnc_bind_address is not None:
3413
      if not utils.IsValidIP(self.op.vnc_bind_address):
3414
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3415
                                   " like a valid IP address" %
3416
                                   self.op.vnc_bind_address)
3417

    
3418
    # Xen HVM device type checks
3419
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3420
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3421
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3422
                                   " hypervisor" % self.op.hvm_nic_type)
3423
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3424
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3425
                                   " hypervisor" % self.op.hvm_disk_type)
3426

    
3427
    if self.op.start:
3428
      self.instance_status = 'up'
3429
    else:
3430
      self.instance_status = 'down'
3431

    
3432
  def Exec(self, feedback_fn):
3433
    """Create and add the instance to the cluster.
3434

3435
    """
3436
    instance = self.op.instance_name
3437
    pnode_name = self.pnode.name
3438

    
3439
    if self.op.mac == "auto":
3440
      mac_address = self.cfg.GenerateMAC()
3441
    else:
3442
      mac_address = self.op.mac
3443

    
3444
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3445
    if self.inst_ip is not None:
3446
      nic.ip = self.inst_ip
3447

    
3448
    ht_kind = self.sstore.GetHypervisorType()
3449
    if ht_kind in constants.HTS_REQ_PORT:
3450
      network_port = self.cfg.AllocatePort()
3451
    else:
3452
      network_port = None
3453

    
3454
    if self.op.vnc_bind_address is None:
3455
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3456

    
3457
    # this is needed because os.path.join does not accept None arguments
3458
    if self.op.file_storage_dir is None:
3459
      string_file_storage_dir = ""
3460
    else:
3461
      string_file_storage_dir = self.op.file_storage_dir
3462

    
3463
    # build the full file storage dir path
3464
    file_storage_dir = os.path.normpath(os.path.join(
3465
                                        self.sstore.GetFileStorageDir(),
3466
                                        string_file_storage_dir, instance))
3467

    
3468

    
3469
    disks = _GenerateDiskTemplate(self.cfg,
3470
                                  self.op.disk_template,
3471
                                  instance, pnode_name,
3472
                                  self.secondaries, self.op.disk_size,
3473
                                  self.op.swap_size,
3474
                                  file_storage_dir,
3475
                                  self.op.file_driver)
3476

    
3477
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3478
                            primary_node=pnode_name,
3479
                            memory=self.op.mem_size,
3480
                            vcpus=self.op.vcpus,
3481
                            nics=[nic], disks=disks,
3482
                            disk_template=self.op.disk_template,
3483
                            status=self.instance_status,
3484
                            network_port=network_port,
3485
                            kernel_path=self.op.kernel_path,
3486
                            initrd_path=self.op.initrd_path,
3487
                            hvm_boot_order=self.op.hvm_boot_order,
3488
                            hvm_acpi=self.op.hvm_acpi,
3489
                            hvm_pae=self.op.hvm_pae,
3490
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3491
                            vnc_bind_address=self.op.vnc_bind_address,
3492
                            hvm_nic_type=self.op.hvm_nic_type,
3493
                            hvm_disk_type=self.op.hvm_disk_type,
3494
                            )
3495

    
3496
    feedback_fn("* creating instance disks...")
3497
    if not _CreateDisks(self.cfg, iobj):
3498
      _RemoveDisks(iobj, self.cfg)
3499
      raise errors.OpExecError("Device creation failed, reverting...")
3500

    
3501
    feedback_fn("adding instance %s to cluster config" % instance)
3502

    
3503
    self.cfg.AddInstance(iobj)
3504
    # Declare that we don't want to remove the instance lock anymore, as we've
3505
    # added the instance to the config
3506
    del self.remove_locks[locking.LEVEL_INSTANCE]
3507

    
3508
    if self.op.wait_for_sync:
3509
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3510
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3511
      # make sure the disks are not degraded (still sync-ing is ok)
3512
      time.sleep(15)
3513
      feedback_fn("* checking mirrors status")
3514
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3515
    else:
3516
      disk_abort = False
3517

    
3518
    if disk_abort:
3519
      _RemoveDisks(iobj, self.cfg)
3520
      self.cfg.RemoveInstance(iobj.name)
3521
      # Make sure the instance lock gets removed
3522
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3523
      raise errors.OpExecError("There are some degraded disks for"
3524
                               " this instance")
3525

    
3526
    feedback_fn("creating os for instance %s on node %s" %
3527
                (instance, pnode_name))
3528

    
3529
    if iobj.disk_template != constants.DT_DISKLESS:
3530
      if self.op.mode == constants.INSTANCE_CREATE:
3531
        feedback_fn("* running the instance OS create scripts...")
3532
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3533
          raise errors.OpExecError("could not add os for instance %s"
3534
                                   " on node %s" %
3535
                                   (instance, pnode_name))
3536

    
3537
      elif self.op.mode == constants.INSTANCE_IMPORT:
3538
        feedback_fn("* running the instance OS import scripts...")
3539
        src_node = self.op.src_node
3540
        src_image = self.src_image
3541
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3542
                                                src_node, src_image):
3543
          raise errors.OpExecError("Could not import os for instance"
3544
                                   " %s on node %s" %
3545
                                   (instance, pnode_name))
3546
      else:
3547
        # also checked in the prereq part
3548
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3549
                                     % self.op.mode)
3550

    
3551
    if self.op.start:
3552
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3553
      feedback_fn("* starting instance...")
3554
      if not rpc.call_instance_start(pnode_name, iobj, None):
3555
        raise errors.OpExecError("Could not start instance")
3556

    
3557

    
3558
class LUConnectConsole(NoHooksLU):
3559
  """Connect to an instance's console.
3560

3561
  This is somewhat special in that it returns the command line that
3562
  you need to run on the master node in order to connect to the
3563
  console.
3564

3565
  """
3566
  _OP_REQP = ["instance_name"]
3567
  REQ_BGL = False
3568

    
3569
  def ExpandNames(self):
3570
    self._ExpandAndLockInstance()
3571

    
3572
  def CheckPrereq(self):
3573
    """Check prerequisites.
3574

3575
    This checks that the instance is in the cluster.
3576

3577
    """
3578
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3579
    assert self.instance is not None, \
3580
      "Cannot retrieve locked instance %s" % self.op.instance_name
3581

    
3582
  def Exec(self, feedback_fn):
3583
    """Connect to the console of an instance
3584

3585
    """
3586
    instance = self.instance
3587
    node = instance.primary_node
3588

    
3589
    node_insts = rpc.call_instance_list([node])[node]
3590
    if node_insts is False:
3591
      raise errors.OpExecError("Can't connect to node %s." % node)
3592

    
3593
    if instance.name not in node_insts:
3594
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3595

    
3596
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3597

    
3598
    hyper = hypervisor.GetHypervisor()
3599
    console_cmd = hyper.GetShellCommandForConsole(instance)
3600

    
3601
    # build ssh cmdline
3602
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3603

    
3604

    
3605
class LUReplaceDisks(LogicalUnit):
3606
  """Replace the disks of an instance.
3607

3608
  """
3609
  HPATH = "mirrors-replace"
3610
  HTYPE = constants.HTYPE_INSTANCE
3611
  _OP_REQP = ["instance_name", "mode", "disks"]
3612
  REQ_BGL = False
3613

    
3614
  def ExpandNames(self):
3615
    self._ExpandAndLockInstance()
3616

    
3617
    if not hasattr(self.op, "remote_node"):
3618
      self.op.remote_node = None
3619

    
3620
    ia_name = getattr(self.op, "iallocator", None)
3621
    if ia_name is not None:
3622
      if self.op.remote_node is not None:
3623
        raise errors.OpPrereqError("Give either the iallocator or the new"
3624
                                   " secondary, not both")
3625
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3626
    elif self.op.remote_node is not None:
3627
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3628
      if remote_node is None:
3629
        raise errors.OpPrereqError("Node '%s' not known" %
3630
                                   self.op.remote_node)
3631
      self.op.remote_node = remote_node
3632
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3633
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3634
    else:
3635
      self.needed_locks[locking.LEVEL_NODE] = []
3636
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3637

    
3638
  def DeclareLocks(self, level):
3639
    # If we're not already locking all nodes in the set we have to declare the
3640
    # instance's primary/secondary nodes.
3641
    if (level == locking.LEVEL_NODE and
3642
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3643
      self._LockInstancesNodes()
3644

    
3645
  def _RunAllocator(self):
3646
    """Compute a new secondary node using an IAllocator.
3647

3648
    """
3649
    ial = IAllocator(self.cfg, self.sstore,
3650
                     mode=constants.IALLOCATOR_MODE_RELOC,
3651
                     name=self.op.instance_name,
3652
                     relocate_from=[self.sec_node])
3653

    
3654
    ial.Run(self.op.iallocator)
3655

    
3656
    if not ial.success:
3657
      raise errors.OpPrereqError("Can't compute nodes using"
3658
                                 " iallocator '%s': %s" % (self.op.iallocator,
3659
                                                           ial.info))
3660
    if len(ial.nodes) != ial.required_nodes:
3661
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3662
                                 " of nodes (%s), required %s" %
3663
                                 (len(ial.nodes), ial.required_nodes))
3664
    self.op.remote_node = ial.nodes[0]
3665
    logger.ToStdout("Selected new secondary for the instance: %s" %
3666
                    self.op.remote_node)
3667

    
3668
  def BuildHooksEnv(self):
3669
    """Build hooks env.
3670

3671
    This runs on the master, the primary and all the secondaries.
3672

3673
    """
3674
    env = {
3675
      "MODE": self.op.mode,
3676
      "NEW_SECONDARY": self.op.remote_node,
3677
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3678
      }
3679
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3680
    nl = [
3681
      self.sstore.GetMasterNode(),
3682
      self.instance.primary_node,
3683
      ]
3684
    if self.op.remote_node is not None:
3685
      nl.append(self.op.remote_node)
3686
    return env, nl, nl
3687

    
3688
  def CheckPrereq(self):
3689
    """Check prerequisites.
3690

3691
    This checks that the instance is in the cluster.
3692

3693
    """
3694
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3695
    assert instance is not None, \
3696
      "Cannot retrieve locked instance %s" % self.op.instance_name
3697
    self.instance = instance
3698

    
3699
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3700
      raise errors.OpPrereqError("Instance's disk layout is not"
3701
                                 " network mirrored.")
3702

    
3703
    if len(instance.secondary_nodes) != 1:
3704
      raise errors.OpPrereqError("The instance has a strange layout,"
3705
                                 " expected one secondary but found %d" %
3706
                                 len(instance.secondary_nodes))
3707

    
3708
    self.sec_node = instance.secondary_nodes[0]
3709

    
3710
    ia_name = getattr(self.op, "iallocator", None)
3711
    if ia_name is not None:
3712
      self._RunAllocator()
3713

    
3714
    remote_node = self.op.remote_node
3715
    if remote_node is not None:
3716
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3717
      assert self.remote_node_info is not None, \
3718
        "Cannot retrieve locked node %s" % remote_node
3719
    else:
3720
      self.remote_node_info = None
3721
    if remote_node == instance.primary_node:
3722
      raise errors.OpPrereqError("The specified node is the primary node of"
3723
                                 " the instance.")
3724
    elif remote_node == self.sec_node:
3725
      if self.op.mode == constants.REPLACE_DISK_SEC:
3726
        # this is for DRBD8, where we can't execute the same mode of
3727
        # replacement as for drbd7 (no different port allocated)
3728
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3729
                                   " replacement")
3730
    if instance.disk_template == constants.DT_DRBD8:
3731
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3732
          remote_node is not None):
3733
        # switch to replace secondary mode
3734
        self.op.mode = constants.REPLACE_DISK_SEC
3735

    
3736
      if self.op.mode == constants.REPLACE_DISK_ALL:
3737
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3738
                                   " secondary disk replacement, not"
3739
                                   " both at once")
3740
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3741
        if remote_node is not None:
3742
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3743
                                     " the secondary while doing a primary"
3744
                                     " node disk replacement")
3745
        self.tgt_node = instance.primary_node
3746
        self.oth_node = instance.secondary_nodes[0]
3747
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3748
        self.new_node = remote_node # this can be None, in which case
3749
                                    # we don't change the secondary
3750
        self.tgt_node = instance.secondary_nodes[0]
3751
        self.oth_node = instance.primary_node
3752
      else:
3753
        raise errors.ProgrammerError("Unhandled disk replace mode")
3754

    
3755
    for name in self.op.disks:
3756
      if instance.FindDisk(name) is None:
3757
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3758
                                   (name, instance.name))
3759

    
3760
  def _ExecD8DiskOnly(self, feedback_fn):
3761
    """Replace a disk on the primary or secondary for dbrd8.
3762

3763
    The algorithm for replace is quite complicated:
3764
      - for each disk to be replaced:
3765
        - create new LVs on the target node with unique names
3766
        - detach old LVs from the drbd device
3767
        - rename old LVs to name_replaced.<time_t>
3768
        - rename new LVs to old LVs
3769
        - attach the new LVs (with the old names now) to the drbd device
3770
      - wait for sync across all devices
3771
      - for each modified disk:
3772
        - remove old LVs (which have the name name_replaces.<time_t>)
3773

3774
    Failures are not very well handled.
3775

3776
    """
3777
    steps_total = 6
3778
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3779
    instance = self.instance
3780
    iv_names = {}
3781
    vgname = self.cfg.GetVGName()
3782
    # start of work
3783
    cfg = self.cfg
3784
    tgt_node = self.tgt_node
3785
    oth_node = self.oth_node
3786

    
3787
    # Step: check device activation
3788
    self.proc.LogStep(1, steps_total, "check device existence")
3789
    info("checking volume groups")
3790
    my_vg = cfg.GetVGName()
3791
    results = rpc.call_vg_list([oth_node, tgt_node])
3792
    if not results:
3793
      raise errors.OpExecError("Can't list volume groups on the nodes")
3794
    for node in oth_node, tgt_node:
3795
      res = results.get(node, False)
3796
      if not res or my_vg not in res:
3797
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3798
                                 (my_vg, node))
3799
    for dev in instance.disks:
3800
      if not dev.iv_name in self.op.disks:
3801
        continue
3802
      for node in tgt_node, oth_node:
3803
        info("checking %s on %s" % (dev.iv_name, node))
3804
        cfg.SetDiskID(dev, node)
3805
        if not rpc.call_blockdev_find(node, dev):
3806
          raise errors.OpExecError("Can't find device %s on node %s" %
3807
                                   (dev.iv_name, node))
3808

    
3809
    # Step: check other node consistency
3810
    self.proc.LogStep(2, steps_total, "check peer consistency")
3811
    for dev in instance.disks:
3812
      if not dev.iv_name in self.op.disks:
3813
        continue
3814
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3815
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3816
                                   oth_node==instance.primary_node):
3817
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3818
                                 " to replace disks on this node (%s)" %
3819
                                 (oth_node, tgt_node))
3820

    
3821
    # Step: create new storage
3822
    self.proc.LogStep(3, steps_total, "allocate new storage")
3823
    for dev in instance.disks:
3824
      if not dev.iv_name in self.op.disks:
3825
        continue
3826
      size = dev.size
3827
      cfg.SetDiskID(dev, tgt_node)
3828
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3829
      names = _GenerateUniqueNames(cfg, lv_names)
3830
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3831
                             logical_id=(vgname, names[0]))
3832
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3833
                             logical_id=(vgname, names[1]))
3834
      new_lvs = [lv_data, lv_meta]
3835
      old_lvs = dev.children
3836
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3837
      info("creating new local storage on %s for %s" %
3838
           (tgt_node, dev.iv_name))
3839
      # since we *always* want to create this LV, we use the
3840
      # _Create...OnPrimary (which forces the creation), even if we
3841
      # are talking about the secondary node
3842
      for new_lv in new_lvs:
3843
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3844
                                        _GetInstanceInfoText(instance)):
3845
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3846
                                   " node '%s'" %
3847
                                   (new_lv.logical_id[1], tgt_node))
3848

    
3849
    # Step: for each lv, detach+rename*2+attach
3850
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3851
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3852
      info("detaching %s drbd from local storage" % dev.iv_name)
3853
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3854
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3855
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3856
      #dev.children = []
3857
      #cfg.Update(instance)
3858

    
3859
      # ok, we created the new LVs, so now we know we have the needed
3860
      # storage; as such, we proceed on the target node to rename
3861
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3862
      # using the assumption that logical_id == physical_id (which in
3863
      # turn is the unique_id on that node)
3864

    
3865
      # FIXME(iustin): use a better name for the replaced LVs
3866
      temp_suffix = int(time.time())
3867
      ren_fn = lambda d, suff: (d.physical_id[0],
3868
                                d.physical_id[1] + "_replaced-%s" % suff)
3869
      # build the rename list based on what LVs exist on the node
3870
      rlist = []
3871
      for to_ren in old_lvs:
3872
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3873
        if find_res is not None: # device exists
3874
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3875

    
3876
      info("renaming the old LVs on the target node")
3877
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3878
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3879
      # now we rename the new LVs to the old LVs
3880
      info("renaming the new LVs on the target node")
3881
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3882
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3883
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3884

    
3885
      for old, new in zip(old_lvs, new_lvs):
3886
        new.logical_id = old.logical_id
3887
        cfg.SetDiskID(new, tgt_node)
3888

    
3889
      for disk in old_lvs:
3890
        disk.logical_id = ren_fn(disk, temp_suffix)
3891
        cfg.SetDiskID(disk, tgt_node)
3892

    
3893
      # now that the new lvs have the old name, we can add them to the device
3894
      info("adding new mirror component on %s" % tgt_node)
3895
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3896
        for new_lv in new_lvs:
3897
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3898
            warning("Can't rollback device %s", hint="manually cleanup unused"
3899
                    " logical volumes")
3900
        raise errors.OpExecError("Can't add local storage to drbd")
3901

    
3902
      dev.children = new_lvs
3903
      cfg.Update(instance)
3904

    
3905
    # Step: wait for sync
3906

    
3907
    # this can fail as the old devices are degraded and _WaitForSync
3908
    # does a combined result over all disks, so we don't check its
3909
    # return value
3910
    self.proc.LogStep(5, steps_total, "sync devices")
3911
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3912

    
3913
    # so check manually all the devices
3914
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3915
      cfg.SetDiskID(dev, instance.primary_node)
3916
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3917
      if is_degr:
3918
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3919

    
3920
    # Step: remove old storage
3921
    self.proc.LogStep(6, steps_total, "removing old storage")
3922
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3923
      info("remove logical volumes for %s" % name)
3924
      for lv in old_lvs:
3925
        cfg.SetDiskID(lv, tgt_node)
3926
        if not rpc.call_blockdev_remove(tgt_node, lv):
3927
          warning("Can't remove old LV", hint="manually remove unused LVs")
3928
          continue
3929

    
3930
  def _ExecD8Secondary(self, feedback_fn):
3931
    """Replace the secondary node for drbd8.
3932

3933
    The algorithm for replace is quite complicated:
3934
      - for all disks of the instance:
3935
        - create new LVs on the new node with same names
3936
        - shutdown the drbd device on the old secondary
3937
        - disconnect the drbd network on the primary
3938
        - create the drbd device on the new secondary
3939
        - network attach the drbd on the primary, using an artifice:
3940
          the drbd code for Attach() will connect to the network if it
3941
          finds a device which is connected to the good local disks but
3942
          not network enabled
3943
      - wait for sync across all devices
3944
      - remove all disks from the old secondary
3945

3946
    Failures are not very well handled.
3947

3948
    """
3949
    steps_total = 6
3950
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3951
    instance = self.instance
3952
    iv_names = {}
3953
    vgname = self.cfg.GetVGName()
3954
    # start of work
3955
    cfg = self.cfg
3956
    old_node = self.tgt_node
3957
    new_node = self.new_node
3958
    pri_node = instance.primary_node
3959

    
3960
    # Step: check device activation
3961
    self.proc.LogStep(1, steps_total, "check device existence")
3962
    info("checking volume groups")
3963
    my_vg = cfg.GetVGName()
3964
    results = rpc.call_vg_list([pri_node, new_node])
3965
    if not results:
3966
      raise errors.OpExecError("Can't list volume groups on the nodes")
3967
    for node in pri_node, new_node:
3968
      res = results.get(node, False)
3969
      if not res or my_vg not in res:
3970
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3971
                                 (my_vg, node))
3972
    for dev in instance.disks:
3973
      if not dev.iv_name in self.op.disks:
3974
        continue
3975
      info("checking %s on %s" % (dev.iv_name, pri_node))
3976
      cfg.SetDiskID(dev, pri_node)
3977
      if not rpc.call_blockdev_find(pri_node, dev):
3978
        raise errors.OpExecError("Can't find device %s on node %s" %
3979
                                 (dev.iv_name, pri_node))
3980

    
3981
    # Step: check other node consistency
3982
    self.proc.LogStep(2, steps_total, "check peer consistency")
3983
    for dev in instance.disks:
3984
      if not dev.iv_name in self.op.disks:
3985
        continue
3986
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3987
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3988
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3989
                                 " unsafe to replace the secondary" %
3990
                                 pri_node)
3991

    
3992
    # Step: create new storage
3993
    self.proc.LogStep(3, steps_total, "allocate new storage")
3994
    for dev in instance.disks:
3995
      size = dev.size
3996
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3997
      # since we *always* want to create this LV, we use the
3998
      # _Create...OnPrimary (which forces the creation), even if we
3999
      # are talking about the secondary node
4000
      for new_lv in dev.children:
4001
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4002
                                        _GetInstanceInfoText(instance)):
4003
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4004
                                   " node '%s'" %
4005
                                   (new_lv.logical_id[1], new_node))
4006

    
4007
      iv_names[dev.iv_name] = (dev, dev.children)
4008

    
4009
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4010
    for dev in instance.disks:
4011
      size = dev.size
4012
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4013
      # create new devices on new_node
4014
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4015
                              logical_id=(pri_node, new_node,
4016
                                          dev.logical_id[2]),
4017
                              children=dev.children)
4018
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4019
                                        new_drbd, False,
4020
                                      _GetInstanceInfoText(instance)):
4021
        raise errors.OpExecError("Failed to create new DRBD on"
4022
                                 " node '%s'" % new_node)
4023

    
4024
    for dev in instance.disks:
4025
      # we have new devices, shutdown the drbd on the old secondary
4026
      info("shutting down drbd for %s on old node" % dev.iv_name)
4027
      cfg.SetDiskID(dev, old_node)
4028
      if not rpc.call_blockdev_shutdown(old_node, dev):
4029
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4030
                hint="Please cleanup this device manually as soon as possible")
4031

    
4032
    info("detaching primary drbds from the network (=> standalone)")
4033
    done = 0
4034
    for dev in instance.disks:
4035
      cfg.SetDiskID(dev, pri_node)
4036
      # set the physical (unique in bdev terms) id to None, meaning
4037
      # detach from network
4038
      dev.physical_id = (None,) * len(dev.physical_id)
4039
      # and 'find' the device, which will 'fix' it to match the
4040
      # standalone state
4041
      if rpc.call_blockdev_find(pri_node, dev):
4042
        done += 1
4043
      else:
4044
        warning("Failed to detach drbd %s from network, unusual case" %
4045
                dev.iv_name)
4046

    
4047
    if not done:
4048
      # no detaches succeeded (very unlikely)
4049
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4050

    
4051
    # if we managed to detach at least one, we update all the disks of
4052
    # the instance to point to the new secondary
4053
    info("updating instance configuration")
4054
    for dev in instance.disks:
4055
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4056
      cfg.SetDiskID(dev, pri_node)
4057
    cfg.Update(instance)
4058

    
4059
    # and now perform the drbd attach
4060
    info("attaching primary drbds to new secondary (standalone => connected)")
4061
    failures = []
4062
    for dev in instance.disks:
4063
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4064
      # since the attach is smart, it's enough to 'find' the device,
4065
      # it will automatically activate the network, if the physical_id
4066
      # is correct
4067
      cfg.SetDiskID(dev, pri_node)
4068
      if not rpc.call_blockdev_find(pri_node, dev):
4069
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4070
                "please do a gnt-instance info to see the status of disks")
4071

    
4072
    # this can fail as the old devices are degraded and _WaitForSync
4073
    # does a combined result over all disks, so we don't check its
4074
    # return value
4075
    self.proc.LogStep(5, steps_total, "sync devices")
4076
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4077

    
4078
    # so check manually all the devices
4079
    for name, (dev, old_lvs) in iv_names.iteritems():
4080
      cfg.SetDiskID(dev, pri_node)
4081
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4082
      if is_degr:
4083
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4084

    
4085
    self.proc.LogStep(6, steps_total, "removing old storage")
4086
    for name, (dev, old_lvs) in iv_names.iteritems():
4087
      info("remove logical volumes for %s" % name)
4088
      for lv in old_lvs:
4089
        cfg.SetDiskID(lv, old_node)
4090
        if not rpc.call_blockdev_remove(old_node, lv):
4091
          warning("Can't remove LV on old secondary",
4092
                  hint="Cleanup stale volumes by hand")
4093

    
4094
  def Exec(self, feedback_fn):
4095
    """Execute disk replacement.
4096

4097
    This dispatches the disk replacement to the appropriate handler.
4098

4099
    """
4100
    instance = self.instance
4101

    
4102
    # Activate the instance disks if we're replacing them on a down instance
4103
    if instance.status == "down":
4104
      _StartInstanceDisks(self.cfg, instance, True)
4105

    
4106
    if instance.disk_template == constants.DT_DRBD8:
4107
      if self.op.remote_node is None:
4108
        fn = self._ExecD8DiskOnly
4109
      else:
4110
        fn = self._ExecD8Secondary
4111
    else:
4112
      raise errors.ProgrammerError("Unhandled disk replacement case")
4113

    
4114
    ret = fn(feedback_fn)
4115

    
4116
    # Deactivate the instance disks if we're replacing them on a down instance
4117
    if instance.status == "down":
4118
      _SafeShutdownInstanceDisks(instance, self.cfg)
4119

    
4120
    return ret
4121

    
4122

    
4123
class LUGrowDisk(LogicalUnit):
4124
  """Grow a disk of an instance.
4125

4126
  """
4127
  HPATH = "disk-grow"
4128
  HTYPE = constants.HTYPE_INSTANCE
4129
  _OP_REQP = ["instance_name", "disk", "amount"]
4130
  REQ_BGL = False
4131

    
4132
  def ExpandNames(self):
4133
    self._ExpandAndLockInstance()
4134
    self.needed_locks[locking.LEVEL_NODE] = []
4135
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4136

    
4137
  def DeclareLocks(self, level):
4138
    if level == locking.LEVEL_NODE:
4139
      self._LockInstancesNodes()
4140

    
4141
  def BuildHooksEnv(self):
4142
    """Build hooks env.
4143

4144
    This runs on the master, the primary and all the secondaries.
4145

4146
    """
4147
    env = {
4148
      "DISK": self.op.disk,
4149
      "AMOUNT": self.op.amount,
4150
      }
4151
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4152
    nl = [
4153
      self.sstore.GetMasterNode(),
4154
      self.instance.primary_node,
4155
      ]
4156
    return env, nl, nl
4157

    
4158
  def CheckPrereq(self):
4159
    """Check prerequisites.
4160

4161
    This checks that the instance is in the cluster.
4162

4163
    """
4164
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4165
    assert instance is not None, \
4166
      "Cannot retrieve locked instance %s" % self.op.instance_name
4167

    
4168
    self.instance = instance
4169

    
4170
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4171
      raise errors.OpPrereqError("Instance's disk layout does not support"
4172
                                 " growing.")
4173

    
4174
    if instance.FindDisk(self.op.disk) is None:
4175
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4176
                                 (self.op.disk, instance.name))
4177

    
4178
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4179
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4180
    for node in nodenames:
4181
      info = nodeinfo.get(node, None)
4182
      if not info:
4183
        raise errors.OpPrereqError("Cannot get current information"
4184
                                   " from node '%s'" % node)
4185
      vg_free = info.get('vg_free', None)
4186
      if not isinstance(vg_free, int):
4187
        raise errors.OpPrereqError("Can't compute free disk space on"
4188
                                   " node %s" % node)
4189
      if self.op.amount > info['vg_free']:
4190
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4191
                                   " %d MiB available, %d MiB required" %
4192
                                   (node, info['vg_free'], self.op.amount))
4193

    
4194
  def Exec(self, feedback_fn):
4195
    """Execute disk grow.
4196

4197
    """
4198
    instance = self.instance
4199
    disk = instance.FindDisk(self.op.disk)
4200
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4201
      self.cfg.SetDiskID(disk, node)
4202
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4203
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4204
        raise errors.OpExecError("grow request failed to node %s" % node)
4205
      elif not result[0]:
4206
        raise errors.OpExecError("grow request failed to node %s: %s" %
4207
                                 (node, result[1]))
4208
    disk.RecordGrow(self.op.amount)
4209
    self.cfg.Update(instance)
4210
    return
4211

    
4212

    
4213
class LUQueryInstanceData(NoHooksLU):
4214
  """Query runtime instance data.
4215

4216
  """
4217
  _OP_REQP = ["instances"]
4218
  REQ_BGL = False
4219
  def ExpandNames(self):
4220
    self.needed_locks = {}
4221
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4222

    
4223
    if not isinstance(self.op.instances, list):
4224
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4225

    
4226
    if self.op.instances:
4227
      self.wanted_names = []
4228
      for name in self.op.instances:
4229
        full_name = self.cfg.ExpandInstanceName(name)
4230
        if full_name is None:
4231
          raise errors.OpPrereqError("Instance '%s' not known" %
4232
                                     self.op.instance_name)
4233
        self.wanted_names.append(full_name)
4234
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4235
    else:
4236
      self.wanted_names = None
4237
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4238

    
4239
    self.needed_locks[locking.LEVEL_NODE] = []
4240
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4241

    
4242
  def DeclareLocks(self, level):
4243
    if level == locking.LEVEL_NODE:
4244
      self._LockInstancesNodes()
4245

    
4246
  def CheckPrereq(self):
4247
    """Check prerequisites.
4248

4249
    This only checks the optional instance list against the existing names.
4250

4251
    """
4252
    if self.wanted_names is None:
4253
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4254

    
4255
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4256
                             in self.wanted_names]
4257
    return
4258

    
4259
  def _ComputeDiskStatus(self, instance, snode, dev):
4260
    """Compute block device status.
4261

4262
    """
4263
    self.cfg.SetDiskID(dev, instance.primary_node)
4264
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4265
    if dev.dev_type in constants.LDS_DRBD:
4266
      # we change the snode then (otherwise we use the one passed in)
4267
      if dev.logical_id[0] == instance.primary_node:
4268
        snode = dev.logical_id[1]
4269
      else:
4270
        snode = dev.logical_id[0]
4271

    
4272
    if snode:
4273
      self.cfg.SetDiskID(dev, snode)
4274
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4275
    else:
4276
      dev_sstatus = None
4277

    
4278
    if dev.children:
4279
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4280
                      for child in dev.children]
4281
    else:
4282
      dev_children = []
4283

    
4284
    data = {
4285
      "iv_name": dev.iv_name,
4286
      "dev_type": dev.dev_type,
4287
      "logical_id": dev.logical_id,
4288
      "physical_id": dev.physical_id,
4289
      "pstatus": dev_pstatus,
4290
      "sstatus": dev_sstatus,
4291
      "children": dev_children,
4292
      }
4293

    
4294
    return data
4295

    
4296
  def Exec(self, feedback_fn):
4297
    """Gather and return data"""
4298
    result = {}
4299
    for instance in self.wanted_instances:
4300
      remote_info = rpc.call_instance_info(instance.primary_node,
4301
                                                instance.name)
4302
      if remote_info and "state" in remote_info:
4303
        remote_state = "up"
4304
      else:
4305
        remote_state = "down"
4306
      if instance.status == "down":
4307
        config_state = "down"
4308
      else:
4309
        config_state = "up"
4310

    
4311
      disks = [self._ComputeDiskStatus(instance, None, device)
4312
               for device in instance.disks]
4313

    
4314
      idict = {
4315
        "name": instance.name,
4316
        "config_state": config_state,
4317
        "run_state": remote_state,
4318
        "pnode": instance.primary_node,
4319
        "snodes": instance.secondary_nodes,
4320
        "os": instance.os,
4321
        "memory": instance.memory,
4322
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4323
        "disks": disks,
4324
        "vcpus": instance.vcpus,
4325
        }
4326

    
4327
      htkind = self.sstore.GetHypervisorType()
4328
      if htkind == constants.HT_XEN_PVM30:
4329
        idict["kernel_path"] = instance.kernel_path
4330
        idict["initrd_path"] = instance.initrd_path
4331

    
4332
      if htkind == constants.HT_XEN_HVM31:
4333
        idict["hvm_boot_order"] = instance.hvm_boot_order
4334
        idict["hvm_acpi"] = instance.hvm_acpi
4335
        idict["hvm_pae"] = instance.hvm_pae
4336
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4337
        idict["hvm_nic_type"] = instance.hvm_nic_type
4338
        idict["hvm_disk_type"] = instance.hvm_disk_type
4339

    
4340
      if htkind in constants.HTS_REQ_PORT:
4341
        if instance.vnc_bind_address is None:
4342
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4343
        else:
4344
          vnc_bind_address = instance.vnc_bind_address
4345
        if instance.network_port is None:
4346
          vnc_console_port = None
4347
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4348
          vnc_console_port = "%s:%s" % (instance.primary_node,
4349
                                       instance.network_port)
4350
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4351
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4352
                                                   instance.network_port,
4353
                                                   instance.primary_node)
4354
        else:
4355
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4356
                                        instance.network_port)
4357
        idict["vnc_console_port"] = vnc_console_port
4358
        idict["vnc_bind_address"] = vnc_bind_address
4359
        idict["network_port"] = instance.network_port
4360

    
4361
      result[instance.name] = idict
4362

    
4363
    return result
4364

    
4365

    
4366
class LUSetInstanceParams(LogicalUnit):
4367
  """Modifies an instances's parameters.
4368

4369
  """
4370
  HPATH = "instance-modify"
4371
  HTYPE = constants.HTYPE_INSTANCE
4372
  _OP_REQP = ["instance_name"]
4373
  REQ_BGL = False
4374

    
4375
  def ExpandNames(self):
4376
    self._ExpandAndLockInstance()
4377

    
4378
  def BuildHooksEnv(self):
4379
    """Build hooks env.
4380

4381
    This runs on the master, primary and secondaries.
4382

4383
    """
4384
    args = dict()
4385
    if self.mem:
4386
      args['memory'] = self.mem
4387
    if self.vcpus:
4388
      args['vcpus'] = self.vcpus
4389
    if self.do_ip or self.do_bridge or self.mac:
4390
      if self.do_ip:
4391
        ip = self.ip
4392
      else:
4393
        ip = self.instance.nics[0].ip
4394
      if self.bridge:
4395
        bridge = self.bridge
4396
      else:
4397
        bridge = self.instance.nics[0].bridge
4398
      if self.mac:
4399
        mac = self.mac
4400
      else:
4401
        mac = self.instance.nics[0].mac
4402
      args['nics'] = [(ip, bridge, mac)]
4403
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4404
    nl = [self.sstore.GetMasterNode(),
4405
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4406
    return env, nl, nl
4407

    
4408
  def CheckPrereq(self):
4409
    """Check prerequisites.
4410

4411
    This only checks the instance list against the existing names.
4412

4413
    """
4414
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4415
    # a separate CheckArguments function, if we implement one, so the operation
4416
    # can be aborted without waiting for any lock, should it have an error...
4417
    self.mem = getattr(self.op, "mem", None)
4418
    self.vcpus = getattr(self.op, "vcpus", None)
4419
    self.ip = getattr(self.op, "ip", None)
4420
    self.mac = getattr(self.op, "mac", None)
4421
    self.bridge = getattr(self.op, "bridge", None)
4422
    self.kernel_path = getattr(self.op, "kernel_path", None)
4423
    self.initrd_path = getattr(self.op, "initrd_path", None)
4424
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4425
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4426
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4427
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4428
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4429
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4430
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4431
    self.force = getattr(self.op, "force", None)
4432
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4433
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4434
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4435
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4436
    if all_parms.count(None) == len(all_parms):
4437
      raise errors.OpPrereqError("No changes submitted")
4438
    if self.mem is not None:
4439
      try:
4440
        self.mem = int(self.mem)
4441
      except ValueError, err:
4442
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4443
    if self.vcpus is not None:
4444
      try:
4445
        self.vcpus = int(self.vcpus)
4446
      except ValueError, err:
4447
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4448
    if self.ip is not None:
4449
      self.do_ip = True
4450
      if self.ip.lower() == "none":
4451
        self.ip = None
4452
      else:
4453
        if not utils.IsValidIP(self.ip):
4454
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4455
    else:
4456
      self.do_ip = False
4457
    self.do_bridge = (self.bridge is not None)
4458
    if self.mac is not None:
4459
      if self.cfg.IsMacInUse(self.mac):
4460
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4461
                                   self.mac)
4462
      if not utils.IsValidMac(self.mac):
4463
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4464

    
4465
    if self.kernel_path is not None:
4466
      self.do_kernel_path = True
4467
      if self.kernel_path == constants.VALUE_NONE:
4468
        raise errors.OpPrereqError("Can't set instance to no kernel")
4469

    
4470
      if self.kernel_path != constants.VALUE_DEFAULT:
4471
        if not os.path.isabs(self.kernel_path):
4472
          raise errors.OpPrereqError("The kernel path must be an absolute"
4473
                                    " filename")
4474
    else:
4475
      self.do_kernel_path = False
4476

    
4477
    if self.initrd_path is not None:
4478
      self.do_initrd_path = True
4479
      if self.initrd_path not in (constants.VALUE_NONE,
4480
                                  constants.VALUE_DEFAULT):
4481
        if not os.path.isabs(self.initrd_path):
4482
          raise errors.OpPrereqError("The initrd path must be an absolute"
4483
                                    " filename")
4484
    else:
4485
      self.do_initrd_path = False
4486

    
4487
    # boot order verification
4488
    if self.hvm_boot_order is not None:
4489
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4490
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4491
          raise errors.OpPrereqError("invalid boot order specified,"
4492
                                     " must be one or more of [acdn]"
4493
                                     " or 'default'")
4494

    
4495
    # hvm_cdrom_image_path verification
4496
    if self.op.hvm_cdrom_image_path is not None:
4497
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4498
              self.op.hvm_cdrom_image_path.lower() == "none"):
4499
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4500
                                   " be an absolute path or None, not %s" %
4501
                                   self.op.hvm_cdrom_image_path)
4502
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4503
              self.op.hvm_cdrom_image_path.lower() == "none"):
4504
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4505
                                   " regular file or a symlink pointing to"
4506
                                   " an existing regular file, not %s" %
4507
                                   self.op.hvm_cdrom_image_path)
4508

    
4509
    # vnc_bind_address verification
4510
    if self.op.vnc_bind_address is not None:
4511
      if not utils.IsValidIP(self.op.vnc_bind_address):
4512
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4513
                                   " like a valid IP address" %
4514
                                   self.op.vnc_bind_address)
4515

    
4516
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4517
    assert self.instance is not None, \
4518
      "Cannot retrieve locked instance %s" % self.op.instance_name
4519
    self.warn = []
4520
    if self.mem is not None and not self.force:
4521
      pnode = self.instance.primary_node
4522
      nodelist = [pnode]
4523
      nodelist.extend(instance.secondary_nodes)
4524
      instance_info = rpc.call_instance_info(pnode, instance.name)
4525
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4526

    
4527
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4528
        # Assume the primary node is unreachable and go ahead
4529
        self.warn.append("Can't get info from primary node %s" % pnode)
4530
      else:
4531
        if instance_info:
4532
          current_mem = instance_info['memory']
4533
        else:
4534
          # Assume instance not running
4535
          # (there is a slight race condition here, but it's not very probable,
4536
          # and we have no other way to check)
4537
          current_mem = 0
4538
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4539
        if miss_mem > 0:
4540
          raise errors.OpPrereqError("This change will prevent the instance"
4541
                                     " from starting, due to %d MB of memory"
4542
                                     " missing on its primary node" % miss_mem)
4543

    
4544
      for node in instance.secondary_nodes:
4545
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4546
          self.warn.append("Can't get info from secondary node %s" % node)
4547
        elif self.mem > nodeinfo[node]['memory_free']:
4548
          self.warn.append("Not enough memory to failover instance to secondary"
4549
                           " node %s" % node)
4550

    
4551
    # Xen HVM device type checks
4552
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4553
      if self.op.hvm_nic_type is not None:
4554
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4555
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4556
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4557
      if self.op.hvm_disk_type is not None:
4558
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4559
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4560
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4561

    
4562
    return
4563

    
4564
  def Exec(self, feedback_fn):
4565
    """Modifies an instance.
4566

4567
    All parameters take effect only at the next restart of the instance.
4568
    """
4569
    # Process here the warnings from CheckPrereq, as we don't have a
4570
    # feedback_fn there.
4571
    for warn in self.warn:
4572
      feedback_fn("WARNING: %s" % warn)
4573

    
4574
    result = []
4575
    instance = self.instance
4576
    if self.mem:
4577
      instance.memory = self.mem
4578
      result.append(("mem", self.mem))
4579
    if self.vcpus:
4580
      instance.vcpus = self.vcpus
4581
      result.append(("vcpus",  self.vcpus))
4582
    if self.do_ip:
4583
      instance.nics[0].ip = self.ip
4584
      result.append(("ip", self.ip))
4585
    if self.bridge:
4586
      instance.nics[0].bridge = self.bridge
4587
      result.append(("bridge", self.bridge))
4588
    if self.mac:
4589
      instance.nics[0].mac = self.mac
4590
      result.append(("mac", self.mac))
4591
    if self.do_kernel_path:
4592
      instance.kernel_path = self.kernel_path
4593
      result.append(("kernel_path", self.kernel_path))
4594
    if self.do_initrd_path:
4595
      instance.initrd_path = self.initrd_path
4596
      result.append(("initrd_path", self.initrd_path))
4597
    if self.hvm_boot_order:
4598
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4599
        instance.hvm_boot_order = None
4600
      else:
4601
        instance.hvm_boot_order = self.hvm_boot_order
4602
      result.append(("hvm_boot_order", self.hvm_boot_order))
4603
    if self.hvm_acpi is not None:
4604
      instance.hvm_acpi = self.hvm_acpi
4605
      result.append(("hvm_acpi", self.hvm_acpi))
4606
    if self.hvm_pae is not None:
4607
      instance.hvm_pae = self.hvm_pae
4608
      result.append(("hvm_pae", self.hvm_pae))
4609
    if self.hvm_nic_type is not None:
4610
      instance.hvm_nic_type = self.hvm_nic_type
4611
      result.append(("hvm_nic_type", self.hvm_nic_type))
4612
    if self.hvm_disk_type is not None:
4613
      instance.hvm_disk_type = self.hvm_disk_type
4614
      result.append(("hvm_disk_type", self.hvm_disk_type))
4615
    if self.hvm_cdrom_image_path:
4616
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4617
        instance.hvm_cdrom_image_path = None
4618
      else:
4619
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4620
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4621
    if self.vnc_bind_address:
4622
      instance.vnc_bind_address = self.vnc_bind_address
4623
      result.append(("vnc_bind_address", self.vnc_bind_address))
4624

    
4625
    self.cfg.Update(instance)
4626

    
4627
    return result
4628

    
4629

    
4630
class LUQueryExports(NoHooksLU):
4631
  """Query the exports list
4632

4633
  """
4634
  _OP_REQP = ['nodes']
4635
  REQ_BGL = False
4636

    
4637
  def ExpandNames(self):
4638
    self.needed_locks = {}
4639
    self.share_locks[locking.LEVEL_NODE] = 1
4640
    if not self.op.nodes:
4641
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4642
    else:
4643
      self.needed_locks[locking.LEVEL_NODE] = \
4644
        _GetWantedNodes(self, self.op.nodes)
4645

    
4646
  def CheckPrereq(self):
4647
    """Check prerequisites.
4648

4649
    """
4650
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4651

    
4652
  def Exec(self, feedback_fn):
4653
    """Compute the list of all the exported system images.
4654

4655
    Returns:
4656
      a dictionary with the structure node->(export-list)
4657
      where export-list is a list of the instances exported on
4658
      that node.
4659

4660
    """
4661
    return rpc.call_export_list(self.nodes)
4662

    
4663

    
4664
class LUExportInstance(LogicalUnit):
4665
  """Export an instance to an image in the cluster.
4666

4667
  """
4668
  HPATH = "instance-export"
4669
  HTYPE = constants.HTYPE_INSTANCE
4670
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4671
  REQ_BGL = False
4672

    
4673
  def ExpandNames(self):
4674
    self._ExpandAndLockInstance()
4675
    # FIXME: lock only instance primary and destination node
4676
    #
4677
    # Sad but true, for now we have do lock all nodes, as we don't know where
4678
    # the previous export might be, and and in this LU we search for it and
4679
    # remove it from its current node. In the future we could fix this by:
4680
    #  - making a tasklet to search (share-lock all), then create the new one,
4681
    #    then one to remove, after
4682
    #  - removing the removal operation altoghether
4683
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4684

    
4685
  def DeclareLocks(self, level):
4686
    """Last minute lock declaration."""
4687
    # All nodes are locked anyway, so nothing to do here.
4688

    
4689
  def BuildHooksEnv(self):
4690
    """Build hooks env.
4691

4692
    This will run on the master, primary node and target node.
4693

4694
    """
4695
    env = {
4696
      "EXPORT_NODE": self.op.target_node,
4697
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4698
      }
4699
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4700
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4701
          self.op.target_node]
4702
    return env, nl, nl
4703

    
4704
  def CheckPrereq(self):
4705
    """Check prerequisites.
4706

4707
    This checks that the instance and node names are valid.
4708

4709
    """
4710
    instance_name = self.op.instance_name
4711
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4712
    assert self.instance is not None, \
4713
          "Cannot retrieve locked instance %s" % self.op.instance_name
4714

    
4715
    self.dst_node = self.cfg.GetNodeInfo(
4716
      self.cfg.ExpandNodeName(self.op.target_node))
4717

    
4718
    assert self.dst_node is not None, \
4719
          "Cannot retrieve locked node %s" % self.op.target_node
4720

    
4721
    # instance disk type verification
4722
    for disk in self.instance.disks:
4723
      if disk.dev_type == constants.LD_FILE:
4724
        raise errors.OpPrereqError("Export not supported for instances with"
4725
                                   " file-based disks")
4726

    
4727
  def Exec(self, feedback_fn):
4728
    """Export an instance to an image in the cluster.
4729

4730
    """
4731
    instance = self.instance
4732
    dst_node = self.dst_node
4733
    src_node = instance.primary_node
4734
    if self.op.shutdown:
4735
      # shutdown the instance, but not the disks
4736
      if not rpc.call_instance_shutdown(src_node, instance):
4737
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4738
                                 (instance.name, src_node))
4739

    
4740
    vgname = self.cfg.GetVGName()
4741

    
4742
    snap_disks = []
4743

    
4744
    try:
4745
      for disk in instance.disks:
4746
        if disk.iv_name == "sda":
4747
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4748
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4749

    
4750
          if not new_dev_name:
4751
            logger.Error("could not snapshot block device %s on node %s" %
4752
                         (disk.logical_id[1], src_node))
4753
          else:
4754
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4755
                                      logical_id=(vgname, new_dev_name),
4756
                                      physical_id=(vgname, new_dev_name),
4757
                                      iv_name=disk.iv_name)
4758
            snap_disks.append(new_dev)
4759

    
4760
    finally:
4761
      if self.op.shutdown and instance.status == "up":
4762
        if not rpc.call_instance_start(src_node, instance, None):
4763
          _ShutdownInstanceDisks(instance, self.cfg)
4764
          raise errors.OpExecError("Could not start instance")
4765

    
4766
    # TODO: check for size
4767

    
4768
    for dev in snap_disks:
4769
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4770
        logger.Error("could not export block device %s from node %s to node %s"
4771
                     % (dev.logical_id[1], src_node, dst_node.name))
4772
      if not rpc.call_blockdev_remove(src_node, dev):
4773
        logger.Error("could not remove snapshot block device %s from node %s" %
4774
                     (dev.logical_id[1], src_node))
4775

    
4776
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4777
      logger.Error("could not finalize export for instance %s on node %s" %
4778
                   (instance.name, dst_node.name))
4779

    
4780
    nodelist = self.cfg.GetNodeList()
4781
    nodelist.remove(dst_node.name)
4782

    
4783
    # on one-node clusters nodelist will be empty after the removal
4784
    # if we proceed the backup would be removed because OpQueryExports
4785
    # substitutes an empty list with the full cluster node list.
4786
    if nodelist:
4787
      exportlist = rpc.call_export_list(nodelist)
4788
      for node in exportlist:
4789
        if instance.name in exportlist[node]:
4790
          if not rpc.call_export_remove(node, instance.name):
4791
            logger.Error("could not remove older export for instance %s"
4792
                         " on node %s" % (instance.name, node))
4793

    
4794

    
4795
class LURemoveExport(NoHooksLU):
4796
  """Remove exports related to the named instance.
4797

4798
  """
4799
  _OP_REQP = ["instance_name"]
4800
  REQ_BGL = False
4801

    
4802
  def ExpandNames(self):
4803
    self.needed_locks = {}
4804
    # We need all nodes to be locked in order for RemoveExport to work, but we
4805
    # don't need to lock the instance itself, as nothing will happen to it (and
4806
    # we can remove exports also for a removed instance)
4807
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4808

    
4809
  def CheckPrereq(self):
4810
    """Check prerequisites.
4811
    """
4812
    pass
4813

    
4814
  def Exec(self, feedback_fn):
4815
    """Remove any export.
4816

4817
    """
4818
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4819
    # If the instance was not found we'll try with the name that was passed in.
4820
    # This will only work if it was an FQDN, though.
4821
    fqdn_warn = False
4822
    if not instance_name:
4823
      fqdn_warn = True
4824
      instance_name = self.op.instance_name
4825

    
4826
    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4827
    found = False
4828
    for node in exportlist:
4829
      if instance_name in exportlist[node]:
4830
        found = True
4831
        if not rpc.call_export_remove(node, instance_name):
4832
          logger.Error("could not remove export for instance %s"
4833
                       " on node %s" % (instance_name, node))
4834

    
4835
    if fqdn_warn and not found:
4836
      feedback_fn("Export not found. If trying to remove an export belonging"
4837
                  " to a deleted instance please use its Fully Qualified"
4838
                  " Domain Name.")
4839

    
4840

    
4841
class TagsLU(NoHooksLU):
4842
  """Generic tags LU.
4843

4844
  This is an abstract class which is the parent of all the other tags LUs.
4845

4846
  """
4847

    
4848
  def ExpandNames(self):
4849
    self.needed_locks = {}
4850
    if self.op.kind == constants.TAG_NODE:
4851
      name = self.cfg.ExpandNodeName(self.op.name)
4852
      if name is None:
4853
        raise errors.OpPrereqError("Invalid node name (%s)" %
4854
                                   (self.op.name,))
4855
      self.op.name = name
4856
      self.needed_locks[locking.LEVEL_NODE] = name
4857
    elif self.op.kind == constants.TAG_INSTANCE:
4858
      name = self.cfg.ExpandInstanceName(self.op.name)
4859
      if name is None:
4860
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4861
                                   (self.op.name,))
4862
      self.op.name = name
4863
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4864

    
4865
  def CheckPrereq(self):
4866
    """Check prerequisites.
4867

4868
    """
4869
    if self.op.kind == constants.TAG_CLUSTER:
4870
      self.target = self.cfg.GetClusterInfo()
4871
    elif self.op.kind == constants.TAG_NODE:
4872
      self.target = self.cfg.GetNodeInfo(self.op.name)
4873
    elif self.op.kind == constants.TAG_INSTANCE:
4874
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4875
    else:
4876
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4877
                                 str(self.op.kind))
4878

    
4879

    
4880
class LUGetTags(TagsLU):
4881
  """Returns the tags of a given object.
4882

4883
  """
4884
  _OP_REQP = ["kind", "name"]
4885
  REQ_BGL = False
4886

    
4887
  def Exec(self, feedback_fn):
4888
    """Returns the tag list.
4889

4890
    """
4891
    return list(self.target.GetTags())
4892

    
4893

    
4894
class LUSearchTags(NoHooksLU):
4895
  """Searches the tags for a given pattern.
4896

4897
  """
4898
  _OP_REQP = ["pattern"]
4899
  REQ_BGL = False
4900

    
4901
  def ExpandNames(self):
4902
    self.needed_locks = {}
4903

    
4904
  def CheckPrereq(self):
4905
    """Check prerequisites.
4906

4907
    This checks the pattern passed for validity by compiling it.
4908

4909
    """
4910
    try:
4911
      self.re = re.compile(self.op.pattern)
4912
    except re.error, err:
4913
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4914
                                 (self.op.pattern, err))
4915

    
4916
  def Exec(self, feedback_fn):
4917
    """Returns the tag list.
4918

4919
    """
4920
    cfg = self.cfg
4921
    tgts = [("/cluster", cfg.GetClusterInfo())]
4922
    ilist = cfg.GetAllInstancesInfo().values()
4923
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4924
    nlist = cfg.GetAllNodesInfo().values()
4925
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4926
    results = []
4927
    for path, target in tgts:
4928
      for tag in target.GetTags():
4929
        if self.re.search(tag):
4930
          results.append((path, tag))
4931
    return results
4932

    
4933

    
4934
class LUAddTags(TagsLU):
4935
  """Sets a tag on a given object.
4936

4937
  """
4938
  _OP_REQP = ["kind", "name", "tags"]
4939
  REQ_BGL = False
4940

    
4941
  def CheckPrereq(self):
4942
    """Check prerequisites.
4943

4944
    This checks the type and length of the tag name and value.
4945

4946
    """
4947
    TagsLU.CheckPrereq(self)
4948
    for tag in self.op.tags:
4949
      objects.TaggableObject.ValidateTag(tag)
4950

    
4951
  def Exec(self, feedback_fn):
4952
    """Sets the tag.
4953

4954
    """
4955
    try:
4956
      for tag in self.op.tags:
4957
        self.target.AddTag(tag)
4958
    except errors.TagError, err:
4959
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4960
    try:
4961
      self.cfg.Update(self.target)
4962
    except errors.ConfigurationError:
4963
      raise errors.OpRetryError("There has been a modification to the"
4964
                                " config file and the operation has been"
4965
                                " aborted. Please retry.")
4966

    
4967

    
4968
class LUDelTags(TagsLU):
4969
  """Delete a list of tags from a given object.
4970

4971
  """
4972
  _OP_REQP = ["kind", "name", "tags"]
4973
  REQ_BGL = False
4974

    
4975
  def CheckPrereq(self):
4976
    """Check prerequisites.
4977

4978
    This checks that we have the given tag.
4979

4980
    """
4981
    TagsLU.CheckPrereq(self)
4982
    for tag in self.op.tags:
4983
      objects.TaggableObject.ValidateTag(tag)
4984
    del_tags = frozenset(self.op.tags)
4985
    cur_tags = self.target.GetTags()
4986
    if not del_tags <= cur_tags:
4987
      diff_tags = del_tags - cur_tags
4988
      diff_names = ["'%s'" % tag for tag in diff_tags]
4989
      diff_names.sort()
4990
      raise errors.OpPrereqError("Tag(s) %s not found" %
4991
                                 (",".join(diff_names)))
4992

    
4993
  def Exec(self, feedback_fn):
4994
    """Remove the tag from the object.
4995

4996
    """
4997
    for tag in self.op.tags:
4998
      self.target.RemoveTag(tag)
4999
    try:
5000
      self.cfg.Update(self.target)
5001
    except errors.ConfigurationError:
5002
      raise errors.OpRetryError("There has been a modification to the"
5003
                                " config file and the operation has been"
5004
                                " aborted. Please retry.")
5005

    
5006

    
5007
class LUTestDelay(NoHooksLU):
5008
  """Sleep for a specified amount of time.
5009

5010
  This LU sleeps on the master and/or nodes for a specified amount of
5011
  time.
5012

5013
  """
5014
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5015
  REQ_BGL = False
5016

    
5017
  def ExpandNames(self):
5018
    """Expand names and set required locks.
5019

5020
    This expands the node list, if any.
5021

5022
    """
5023
    self.needed_locks = {}
5024
    if self.op.on_nodes:
5025
      # _GetWantedNodes can be used here, but is not always appropriate to use
5026
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5027
      # more information.
5028
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5029
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5030

    
5031
  def CheckPrereq(self):
5032
    """Check prerequisites.
5033

5034
    """
5035

    
5036
  def Exec(self, feedback_fn):
5037
    """Do the actual sleep.
5038

5039
    """
5040
    if self.op.on_master:
5041
      if not utils.TestDelay(self.op.duration):
5042
        raise errors.OpExecError("Error during master delay test")
5043
    if self.op.on_nodes:
5044
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5045
      if not result:
5046
        raise errors.OpExecError("Complete failure from rpc call")
5047
      for node, node_result in result.items():
5048
        if not node_result:
5049
          raise errors.OpExecError("Failure during rpc call to node %s,"
5050
                                   " result: %s" % (node, node_result))
5051

    
5052

    
5053
class IAllocator(object):
5054
  """IAllocator framework.
5055

5056
  An IAllocator instance has three sets of attributes:
5057
    - cfg/sstore that are needed to query the cluster
5058
    - input data (all members of the _KEYS class attribute are required)
5059
    - four buffer attributes (in|out_data|text), that represent the
5060
      input (to the external script) in text and data structure format,
5061
      and the output from it, again in two formats
5062
    - the result variables from the script (success, info, nodes) for
5063
      easy usage
5064

5065
  """
5066
  _ALLO_KEYS = [
5067
    "mem_size", "disks", "disk_template",
5068
    "os", "tags", "nics", "vcpus",
5069
    ]
5070
  _RELO_KEYS = [
5071
    "relocate_from",
5072
    ]
5073

    
5074
  def __init__(self, cfg, sstore, mode, name, **kwargs):
5075
    self.cfg = cfg
5076
    self.sstore = sstore
5077
    # init buffer variables
5078
    self.in_text = self.out_text = self.in_data = self.out_data = None
5079
    # init all input fields so that pylint is happy
5080
    self.mode = mode
5081
    self.name = name
5082
    self.mem_size = self.disks = self.disk_template = None
5083
    self.os = self.tags = self.nics = self.vcpus = None
5084
    self.relocate_from = None
5085
    # computed fields
5086
    self.required_nodes = None
5087
    # init result fields
5088
    self.success = self.info = self.nodes = None
5089
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5090
      keyset = self._ALLO_KEYS
5091
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5092
      keyset = self._RELO_KEYS
5093
    else:
5094
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5095
                                   " IAllocator" % self.mode)
5096
    for key in kwargs:
5097
      if key not in keyset:
5098
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5099
                                     " IAllocator" % key)
5100
      setattr(self, key, kwargs[key])
5101
    for key in keyset:
5102
      if key not in kwargs:
5103
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5104
                                     " IAllocator" % key)
5105
    self._BuildInputData()
5106

    
5107
  def _ComputeClusterData(self):
5108
    """Compute the generic allocator input data.
5109

5110
    This is the data that is independent of the actual operation.
5111

5112
    """
5113
    cfg = self.cfg
5114
    # cluster data
5115
    data = {
5116
      "version": 1,
5117
      "cluster_name": self.sstore.GetClusterName(),
5118
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5119
      "hypervisor_type": self.sstore.GetHypervisorType(),
5120
      # we don't have job IDs
5121
      }
5122

    
5123
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5124

    
5125
    # node data
5126
    node_results = {}
5127
    node_list = cfg.GetNodeList()
5128
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5129
    for nname in node_list:
5130
      ninfo = cfg.GetNodeInfo(nname)
5131
      if nname not in node_data or not isinstance(node_data[nname], dict):
5132
        raise errors.OpExecError("Can't get data for node %s" % nname)
5133
      remote_info = node_data[nname]
5134
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5135
                   'vg_size', 'vg_free', 'cpu_total']:
5136
        if attr not in remote_info:
5137
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5138
                                   (nname, attr))
5139
        try:
5140
          remote_info[attr] = int(remote_info[attr])
5141
        except ValueError, err:
5142
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5143
                                   " %s" % (nname, attr, str(err)))
5144
      # compute memory used by primary instances
5145
      i_p_mem = i_p_up_mem = 0
5146
      for iinfo in i_list:
5147
        if iinfo.primary_node == nname:
5148
          i_p_mem += iinfo.memory
5149
          if iinfo.status == "up":
5150
            i_p_up_mem += iinfo.memory
5151

    
5152
      # compute memory used by instances
5153
      pnr = {
5154
        "tags": list(ninfo.GetTags()),
5155
        "total_memory": remote_info['memory_total'],
5156
        "reserved_memory": remote_info['memory_dom0'],
5157
        "free_memory": remote_info['memory_free'],
5158
        "i_pri_memory": i_p_mem,
5159
        "i_pri_up_memory": i_p_up_mem,
5160
        "total_disk": remote_info['vg_size'],
5161
        "free_disk": remote_info['vg_free'],
5162
        "primary_ip": ninfo.primary_ip,
5163
        "secondary_ip": ninfo.secondary_ip,
5164
        "total_cpus": remote_info['cpu_total'],
5165
        }
5166
      node_results[nname] = pnr
5167
    data["nodes"] = node_results
5168

    
5169
    # instance data
5170
    instance_data = {}
5171
    for iinfo in i_list:
5172
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5173
                  for n in iinfo.nics]
5174
      pir = {
5175
        "tags": list(iinfo.GetTags()),
5176
        "should_run": iinfo.status == "up",
5177
        "vcpus": iinfo.vcpus,
5178
        "memory": iinfo.memory,
5179
        "os": iinfo.os,
5180
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5181
        "nics": nic_data,
5182
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5183
        "disk_template": iinfo.disk_template,
5184
        }
5185
      instance_data[iinfo.name] = pir
5186

    
5187
    data["instances"] = instance_data
5188

    
5189
    self.in_data = data
5190

    
5191
  def _AddNewInstance(self):
5192
    """Add new instance data to allocator structure.
5193

5194
    This in combination with _AllocatorGetClusterData will create the
5195
    correct structure needed as input for the allocator.
5196

5197
    The checks for the completeness of the opcode must have already been
5198
    done.
5199

5200
    """
5201
    data = self.in_data
5202
    if len(self.disks) != 2:
5203
      raise errors.OpExecError("Only two-disk configurations supported")
5204

    
5205
    disk_space = _ComputeDiskSize(self.disk_template,
5206
                                  self.disks[0]["size"], self.disks[1]["size"])
5207

    
5208
    if self.disk_template in constants.DTS_NET_MIRROR:
5209
      self.required_nodes = 2
5210
    else:
5211
      self.required_nodes = 1
5212
    request = {
5213
      "type": "allocate",
5214
      "name": self.name,
5215
      "disk_template": self.disk_template,
5216
      "tags": self.tags,
5217
      "os": self.os,
5218
      "vcpus": self.vcpus,
5219
      "memory": self.mem_size,
5220
      "disks": self.disks,
5221
      "disk_space_total": disk_space,
5222
      "nics": self.nics,
5223
      "required_nodes": self.required_nodes,
5224
      }
5225
    data["request"] = request
5226

    
5227
  def _AddRelocateInstance(self):
5228
    """Add relocate instance data to allocator structure.
5229

5230
    This in combination with _IAllocatorGetClusterData will create the
5231
    correct structure needed as input for the allocator.
5232

5233
    The checks for the completeness of the opcode must have already been
5234
    done.
5235

5236
    """
5237
    instance = self.cfg.GetInstanceInfo(self.name)
5238
    if instance is None:
5239
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5240
                                   " IAllocator" % self.name)
5241

    
5242
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5243
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5244

    
5245
    if len(instance.secondary_nodes) != 1:
5246
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5247

    
5248
    self.required_nodes = 1
5249

    
5250
    disk_space = _ComputeDiskSize(instance.disk_template,
5251
                                  instance.disks[0].size,
5252
                                  instance.disks[1].size)
5253

    
5254
    request = {
5255
      "type": "relocate",
5256
      "name": self.name,
5257
      "disk_space_total": disk_space,
5258
      "required_nodes": self.required_nodes,
5259
      "relocate_from": self.relocate_from,
5260
      }
5261
    self.in_data["request"] = request
5262

    
5263
  def _BuildInputData(self):
5264
    """Build input data structures.
5265

5266
    """
5267
    self._ComputeClusterData()
5268

    
5269
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5270
      self._AddNewInstance()
5271
    else:
5272
      self._AddRelocateInstance()
5273

    
5274
    self.in_text = serializer.Dump(self.in_data)
5275

    
5276
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5277
    """Run an instance allocator and return the results.
5278

5279
    """
5280
    data = self.in_text
5281

    
5282
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5283

    
5284
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5285
      raise errors.OpExecError("Invalid result from master iallocator runner")
5286

    
5287
    rcode, stdout, stderr, fail = result
5288

    
5289
    if rcode == constants.IARUN_NOTFOUND:
5290
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5291
    elif rcode == constants.IARUN_FAILURE:
5292
      raise errors.OpExecError("Instance allocator call failed: %s,"
5293
                               " output: %s" % (fail, stdout+stderr))
5294
    self.out_text = stdout
5295
    if validate:
5296
      self._ValidateResult()
5297

    
5298
  def _ValidateResult(self):
5299
    """Process the allocator results.
5300

5301
    This will process and if successful save the result in
5302
    self.out_data and the other parameters.
5303

5304
    """
5305
    try:
5306
      rdict = serializer.Load(self.out_text)
5307
    except Exception, err:
5308
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5309

    
5310
    if not isinstance(rdict, dict):
5311
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5312

    
5313
    for key in "success", "info", "nodes":
5314
      if key not in rdict:
5315
        raise errors.OpExecError("Can't parse iallocator results:"
5316
                                 " missing key '%s'" % key)
5317
      setattr(self, key, rdict[key])
5318

    
5319
    if not isinstance(rdict["nodes"], list):
5320
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5321
                               " is not a list")
5322
    self.out_data = rdict
5323

    
5324

    
5325
class LUTestAllocator(NoHooksLU):
5326
  """Run allocator tests.
5327

5328
  This LU runs the allocator tests
5329

5330
  """
5331
  _OP_REQP = ["direction", "mode", "name"]
5332

    
5333
  def CheckPrereq(self):
5334
    """Check prerequisites.
5335

5336
    This checks the opcode parameters depending on the director and mode test.
5337

5338
    """
5339
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5340
      for attr in ["name", "mem_size", "disks", "disk_template",
5341
                   "os", "tags", "nics", "vcpus"]:
5342
        if not hasattr(self.op, attr):
5343
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5344
                                     attr)
5345
      iname = self.cfg.ExpandInstanceName(self.op.name)
5346
      if iname is not None:
5347
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5348
                                   iname)
5349
      if not isinstance(self.op.nics, list):
5350
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5351
      for row in self.op.nics:
5352
        if (not isinstance(row, dict) or
5353
            "mac" not in row or
5354
            "ip" not in row or
5355
            "bridge" not in row):
5356
          raise errors.OpPrereqError("Invalid contents of the"
5357
                                     " 'nics' parameter")
5358
      if not isinstance(self.op.disks, list):
5359
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5360
      if len(self.op.disks) != 2:
5361
        raise errors.OpPrereqError("Only two-disk configurations supported")
5362
      for row in self.op.disks:
5363
        if (not isinstance(row, dict) or
5364
            "size" not in row or
5365
            not isinstance(row["size"], int) or
5366
            "mode" not in row or
5367
            row["mode"] not in ['r', 'w']):
5368
          raise errors.OpPrereqError("Invalid contents of the"
5369
                                     " 'disks' parameter")
5370
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5371
      if not hasattr(self.op, "name"):
5372
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5373
      fname = self.cfg.ExpandInstanceName(self.op.name)
5374
      if fname is None:
5375
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5376
                                   self.op.name)
5377
      self.op.name = fname
5378
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5379
    else:
5380
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5381
                                 self.op.mode)
5382

    
5383
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5384
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5385
        raise errors.OpPrereqError("Missing allocator name")
5386
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5387
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5388
                                 self.op.direction)
5389

    
5390
  def Exec(self, feedback_fn):
5391
    """Run the allocator test.
5392

5393
    """
5394
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5395
      ial = IAllocator(self.cfg, self.sstore,
5396
                       mode=self.op.mode,
5397
                       name=self.op.name,
5398
                       mem_size=self.op.mem_size,
5399
                       disks=self.op.disks,
5400
                       disk_template=self.op.disk_template,
5401
                       os=self.op.os,
5402
                       tags=self.op.tags,
5403
                       nics=self.op.nics,
5404
                       vcpus=self.op.vcpus,
5405
                       )
5406
    else:
5407
      ial = IAllocator(self.cfg, self.sstore,
5408
                       mode=self.op.mode,
5409
                       name=self.op.name,
5410
                       relocate_from=list(self.relocate_from),
5411
                       )
5412

    
5413
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5414
      result = ial.in_text
5415
    else:
5416
      ial.Run(self.op.allocator, validate=False)
5417
      result = ial.out_text
5418
    return result