Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3fa93523

History | View | Annotate | Download (186.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    # Dicts used to declare locking needs to mcpu
84
    self.needed_locks = None
85
    self.acquired_locks = {}
86
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87
    self.add_locks = {}
88
    self.remove_locks = {}
89
    # Used to force good behavior when calling helper functions
90
    self.recalculate_locks = {}
91
    self.__ssh = None
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98

    
99
    if not self.cfg.IsCluster():
100
      raise errors.OpPrereqError("Cluster not initialized yet,"
101
                                 " use 'gnt-cluster init' first.")
102
    if self.REQ_MASTER:
103
      master = sstore.GetMasterNode()
104
      if master != utils.HostInfo().name:
105
        raise errors.OpPrereqError("Commands must be run on the master"
106
                                   " node %s" % master)
107

    
108
  def __GetSSH(self):
109
    """Returns the SshRunner object
110

111
    """
112
    if not self.__ssh:
113
      self.__ssh = ssh.SshRunner(self.sstore)
114
    return self.__ssh
115

    
116
  ssh = property(fget=__GetSSH)
117

    
118
  def ExpandNames(self):
119
    """Expand names for this LU.
120

121
    This method is called before starting to execute the opcode, and it should
122
    update all the parameters of the opcode to their canonical form (e.g. a
123
    short node name must be fully expanded after this method has successfully
124
    completed). This way locking, hooks, logging, ecc. can work correctly.
125

126
    LUs which implement this method must also populate the self.needed_locks
127
    member, as a dict with lock levels as keys, and a list of needed lock names
128
    as values. Rules:
129
      - Use an empty dict if you don't need any lock
130
      - If you don't need any lock at a particular level omit that level
131
      - Don't put anything for the BGL level
132
      - If you want all locks at a level use locking.ALL_SET as a value
133

134
    If you need to share locks (rather than acquire them exclusively) at one
135
    level you can modify self.share_locks, setting a true value (usually 1) for
136
    that level. By default locks are not shared.
137

138
    Examples:
139
    # Acquire all nodes and one instance
140
    self.needed_locks = {
141
      locking.LEVEL_NODE: locking.ALL_SET,
142
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
143
    }
144
    # Acquire just two nodes
145
    self.needed_locks = {
146
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147
    }
148
    # Acquire no locks
149
    self.needed_locks = {} # No, you can't leave it to the default value None
150

151
    """
152
    # The implementation of this method is mandatory only if the new LU is
153
    # concurrent, so that old LUs don't need to be changed all at the same
154
    # time.
155
    if self.REQ_BGL:
156
      self.needed_locks = {} # Exclusive LUs don't need locks.
157
    else:
158
      raise NotImplementedError
159

    
160
  def DeclareLocks(self, level):
161
    """Declare LU locking needs for a level
162

163
    While most LUs can just declare their locking needs at ExpandNames time,
164
    sometimes there's the need to calculate some locks after having acquired
165
    the ones before. This function is called just before acquiring locks at a
166
    particular level, but after acquiring the ones at lower levels, and permits
167
    such calculations. It can be used to modify self.needed_locks, and by
168
    default it does nothing.
169

170
    This function is only called if you have something already set in
171
    self.needed_locks for the level.
172

173
    @param level: Locking level which is going to be locked
174
    @type level: member of ganeti.locking.LEVELS
175

176
    """
177

    
178
  def CheckPrereq(self):
179
    """Check prerequisites for this LU.
180

181
    This method should check that the prerequisites for the execution
182
    of this LU are fulfilled. It can do internode communication, but
183
    it should be idempotent - no cluster or system changes are
184
    allowed.
185

186
    The method should raise errors.OpPrereqError in case something is
187
    not fulfilled. Its return value is ignored.
188

189
    This method should also update all the parameters of the opcode to
190
    their canonical form if it hasn't been done by ExpandNames before.
191

192
    """
193
    raise NotImplementedError
194

    
195
  def Exec(self, feedback_fn):
196
    """Execute the LU.
197

198
    This method should implement the actual work. It should raise
199
    errors.OpExecError for failures that are somewhat dealt with in
200
    code, or expected.
201

202
    """
203
    raise NotImplementedError
204

    
205
  def BuildHooksEnv(self):
206
    """Build hooks environment for this LU.
207

208
    This method should return a three-node tuple consisting of: a dict
209
    containing the environment that will be used for running the
210
    specific hook for this LU, a list of node names on which the hook
211
    should run before the execution, and a list of node names on which
212
    the hook should run after the execution.
213

214
    The keys of the dict must not have 'GANETI_' prefixed as this will
215
    be handled in the hooks runner. Also note additional keys will be
216
    added by the hooks runner. If the LU doesn't define any
217
    environment, an empty dict (and not None) should be returned.
218

219
    No nodes should be returned as an empty list (and not None).
220

221
    Note that if the HPATH for a LU class is None, this function will
222
    not be called.
223

224
    """
225
    raise NotImplementedError
226

    
227
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
228
    """Notify the LU about the results of its hooks.
229

230
    This method is called every time a hooks phase is executed, and notifies
231
    the Logical Unit about the hooks' result. The LU can then use it to alter
232
    its result based on the hooks.  By default the method does nothing and the
233
    previous result is passed back unchanged but any LU can define it if it
234
    wants to use the local cluster hook-scripts somehow.
235

236
    Args:
237
      phase: the hooks phase that has just been run
238
      hooks_results: the results of the multi-node hooks rpc call
239
      feedback_fn: function to send feedback back to the caller
240
      lu_result: the previous result this LU had, or None in the PRE phase.
241

242
    """
243
    return lu_result
244

    
245
  def _ExpandAndLockInstance(self):
246
    """Helper function to expand and lock an instance.
247

248
    Many LUs that work on an instance take its name in self.op.instance_name
249
    and need to expand it and then declare the expanded name for locking. This
250
    function does it, and then updates self.op.instance_name to the expanded
251
    name. It also initializes needed_locks as a dict, if this hasn't been done
252
    before.
253

254
    """
255
    if self.needed_locks is None:
256
      self.needed_locks = {}
257
    else:
258
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
259
        "_ExpandAndLockInstance called with instance-level locks set"
260
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
261
    if expanded_name is None:
262
      raise errors.OpPrereqError("Instance '%s' not known" %
263
                                  self.op.instance_name)
264
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
265
    self.op.instance_name = expanded_name
266

    
267
  def _LockInstancesNodes(self, primary_only=False):
268
    """Helper function to declare instances' nodes for locking.
269

270
    This function should be called after locking one or more instances to lock
271
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
272
    with all primary or secondary nodes for instances already locked and
273
    present in self.needed_locks[locking.LEVEL_INSTANCE].
274

275
    It should be called from DeclareLocks, and for safety only works if
276
    self.recalculate_locks[locking.LEVEL_NODE] is set.
277

278
    In the future it may grow parameters to just lock some instance's nodes, or
279
    to just lock primaries or secondary nodes, if needed.
280

281
    If should be called in DeclareLocks in a way similar to:
282

283
    if level == locking.LEVEL_NODE:
284
      self._LockInstancesNodes()
285

286
    @type primary_only: boolean
287
    @param primary_only: only lock primary nodes of locked instances
288

289
    """
290
    assert locking.LEVEL_NODE in self.recalculate_locks, \
291
      "_LockInstancesNodes helper function called with no nodes to recalculate"
292

    
293
    # TODO: check if we're really been called with the instance locks held
294

    
295
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
296
    # future we might want to have different behaviors depending on the value
297
    # of self.recalculate_locks[locking.LEVEL_NODE]
298
    wanted_nodes = []
299
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
300
      instance = self.context.cfg.GetInstanceInfo(instance_name)
301
      wanted_nodes.append(instance.primary_node)
302
      if not primary_only:
303
        wanted_nodes.extend(instance.secondary_nodes)
304

    
305
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
306
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
307
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
308
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
309

    
310
    del self.recalculate_locks[locking.LEVEL_NODE]
311

    
312

    
313
class NoHooksLU(LogicalUnit):
314
  """Simple LU which runs no hooks.
315

316
  This LU is intended as a parent for other LogicalUnits which will
317
  run no hooks, in order to reduce duplicate code.
318

319
  """
320
  HPATH = None
321
  HTYPE = None
322

    
323

    
324
def _GetWantedNodes(lu, nodes):
325
  """Returns list of checked and expanded node names.
326

327
  Args:
328
    nodes: List of nodes (strings) or None for all
329

330
  """
331
  if not isinstance(nodes, list):
332
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
333

    
334
  if not nodes:
335
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
336
      " non-empty list of nodes whose name is to be expanded.")
337

    
338
  wanted = []
339
  for name in nodes:
340
    node = lu.cfg.ExpandNodeName(name)
341
    if node is None:
342
      raise errors.OpPrereqError("No such node name '%s'" % name)
343
    wanted.append(node)
344

    
345
  return utils.NiceSort(wanted)
346

    
347

    
348
def _GetWantedInstances(lu, instances):
349
  """Returns list of checked and expanded instance names.
350

351
  Args:
352
    instances: List of instances (strings) or None for all
353

354
  """
355
  if not isinstance(instances, list):
356
    raise errors.OpPrereqError("Invalid argument type 'instances'")
357

    
358
  if instances:
359
    wanted = []
360

    
361
    for name in instances:
362
      instance = lu.cfg.ExpandInstanceName(name)
363
      if instance is None:
364
        raise errors.OpPrereqError("No such instance name '%s'" % name)
365
      wanted.append(instance)
366

    
367
  else:
368
    wanted = lu.cfg.GetInstanceList()
369
  return utils.NiceSort(wanted)
370

    
371

    
372
def _CheckOutputFields(static, dynamic, selected):
373
  """Checks whether all selected fields are valid.
374

375
  Args:
376
    static: Static fields
377
    dynamic: Dynamic fields
378

379
  """
380
  static_fields = frozenset(static)
381
  dynamic_fields = frozenset(dynamic)
382

    
383
  all_fields = static_fields | dynamic_fields
384

    
385
  if not all_fields.issuperset(selected):
386
    raise errors.OpPrereqError("Unknown output fields selected: %s"
387
                               % ",".join(frozenset(selected).
388
                                          difference(all_fields)))
389

    
390

    
391
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
392
                          memory, vcpus, nics):
393
  """Builds instance related env variables for hooks from single variables.
394

395
  Args:
396
    secondary_nodes: List of secondary nodes as strings
397
  """
398
  env = {
399
    "OP_TARGET": name,
400
    "INSTANCE_NAME": name,
401
    "INSTANCE_PRIMARY": primary_node,
402
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
403
    "INSTANCE_OS_TYPE": os_type,
404
    "INSTANCE_STATUS": status,
405
    "INSTANCE_MEMORY": memory,
406
    "INSTANCE_VCPUS": vcpus,
407
  }
408

    
409
  if nics:
410
    nic_count = len(nics)
411
    for idx, (ip, bridge, mac) in enumerate(nics):
412
      if ip is None:
413
        ip = ""
414
      env["INSTANCE_NIC%d_IP" % idx] = ip
415
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
416
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
417
  else:
418
    nic_count = 0
419

    
420
  env["INSTANCE_NIC_COUNT"] = nic_count
421

    
422
  return env
423

    
424

    
425
def _BuildInstanceHookEnvByObject(instance, override=None):
426
  """Builds instance related env variables for hooks from an object.
427

428
  Args:
429
    instance: objects.Instance object of instance
430
    override: dict of values to override
431
  """
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': instance.memory,
439
    'vcpus': instance.vcpus,
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.sstore.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.sstore.GetMasterNode()
489
    if not rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    # checks config file checksum
555
    # checks ssh to any
556

    
557
    if 'filelist' not in node_result:
558
      bad = True
559
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
560
    else:
561
      remote_cksum = node_result['filelist']
562
      for file_name in file_list:
563
        if file_name not in remote_cksum:
564
          bad = True
565
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
566
        elif remote_cksum[file_name] != local_cksum[file_name]:
567
          bad = True
568
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
569

    
570
    if 'nodelist' not in node_result:
571
      bad = True
572
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
573
    else:
574
      if node_result['nodelist']:
575
        bad = True
576
        for node in node_result['nodelist']:
577
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
578
                          (node, node_result['nodelist'][node]))
579
    if 'node-net-test' not in node_result:
580
      bad = True
581
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
582
    else:
583
      if node_result['node-net-test']:
584
        bad = True
585
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
586
        for node in nlist:
587
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
588
                          (node, node_result['node-net-test'][node]))
589

    
590
    hyp_result = node_result.get('hypervisor', None)
591
    if hyp_result is not None:
592
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
593
    return bad
594

    
595
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
596
                      node_instance, feedback_fn):
597
    """Verify an instance.
598

599
    This function checks to see if the required block devices are
600
    available on the instance's node.
601

602
    """
603
    bad = False
604

    
605
    node_current = instanceconfig.primary_node
606

    
607
    node_vol_should = {}
608
    instanceconfig.MapLVsByNode(node_vol_should)
609

    
610
    for node in node_vol_should:
611
      for volume in node_vol_should[node]:
612
        if node not in node_vol_is or volume not in node_vol_is[node]:
613
          feedback_fn("  - ERROR: volume %s missing on node %s" %
614
                          (volume, node))
615
          bad = True
616

    
617
    if not instanceconfig.status == 'down':
618
      if (node_current not in node_instance or
619
          not instance in node_instance[node_current]):
620
        feedback_fn("  - ERROR: instance %s not running on node %s" %
621
                        (instance, node_current))
622
        bad = True
623

    
624
    for node in node_instance:
625
      if (not node == node_current):
626
        if instance in node_instance[node]:
627
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
628
                          (instance, node))
629
          bad = True
630

    
631
    return bad
632

    
633
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
634
    """Verify if there are any unknown volumes in the cluster.
635

636
    The .os, .swap and backup volumes are ignored. All other volumes are
637
    reported as unknown.
638

639
    """
640
    bad = False
641

    
642
    for node in node_vol_is:
643
      for volume in node_vol_is[node]:
644
        if node not in node_vol_should or volume not in node_vol_should[node]:
645
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
646
                      (volume, node))
647
          bad = True
648
    return bad
649

    
650
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
651
    """Verify the list of running instances.
652

653
    This checks what instances are running but unknown to the cluster.
654

655
    """
656
    bad = False
657
    for node in node_instance:
658
      for runninginstance in node_instance[node]:
659
        if runninginstance not in instancelist:
660
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
661
                          (runninginstance, node))
662
          bad = True
663
    return bad
664

    
665
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
666
    """Verify N+1 Memory Resilience.
667

668
    Check that if one single node dies we can still start all the instances it
669
    was primary for.
670

671
    """
672
    bad = False
673

    
674
    for node, nodeinfo in node_info.iteritems():
675
      # This code checks that every node which is now listed as secondary has
676
      # enough memory to host all instances it is supposed to should a single
677
      # other node in the cluster fail.
678
      # FIXME: not ready for failover to an arbitrary node
679
      # FIXME: does not support file-backed instances
680
      # WARNING: we currently take into account down instances as well as up
681
      # ones, considering that even if they're down someone might want to start
682
      # them even in the event of a node failure.
683
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
684
        needed_mem = 0
685
        for instance in instances:
686
          needed_mem += instance_cfg[instance].memory
687
        if nodeinfo['mfree'] < needed_mem:
688
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
689
                      " failovers should node %s fail" % (node, prinode))
690
          bad = True
691
    return bad
692

    
693
  def CheckPrereq(self):
694
    """Check prerequisites.
695

696
    Transform the list of checks we're going to skip into a set and check that
697
    all its members are valid.
698

699
    """
700
    self.skip_set = frozenset(self.op.skip_checks)
701
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
702
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
703

    
704
  def BuildHooksEnv(self):
705
    """Build hooks env.
706

707
    Cluster-Verify hooks just rone in the post phase and their failure makes
708
    the output be logged in the verify output and the verification to fail.
709

710
    """
711
    all_nodes = self.cfg.GetNodeList()
712
    # TODO: populate the environment with useful information for verify hooks
713
    env = {}
714
    return env, [], all_nodes
715

    
716
  def Exec(self, feedback_fn):
717
    """Verify integrity of cluster, performing various test on nodes.
718

719
    """
720
    bad = False
721
    feedback_fn("* Verifying global settings")
722
    for msg in self.cfg.VerifyConfig():
723
      feedback_fn("  - ERROR: %s" % msg)
724

    
725
    vg_name = self.cfg.GetVGName()
726
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
727
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
728
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
729
    i_non_redundant = [] # Non redundant instances
730
    node_volume = {}
731
    node_instance = {}
732
    node_info = {}
733
    instance_cfg = {}
734

    
735
    # FIXME: verify OS list
736
    # do local checksums
737
    file_names = list(self.sstore.GetFileList())
738
    file_names.append(constants.SSL_CERT_FILE)
739
    file_names.append(constants.CLUSTER_CONF_FILE)
740
    local_checksums = utils.FingerprintFiles(file_names)
741

    
742
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
743
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
744
    all_instanceinfo = rpc.call_instance_list(nodelist)
745
    all_vglist = rpc.call_vg_list(nodelist)
746
    node_verify_param = {
747
      'filelist': file_names,
748
      'nodelist': nodelist,
749
      'hypervisor': None,
750
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
751
                        for node in nodeinfo]
752
      }
753
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
754
    all_rversion = rpc.call_version(nodelist)
755
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
756

    
757
    for node in nodelist:
758
      feedback_fn("* Verifying node %s" % node)
759
      result = self._VerifyNode(node, file_names, local_checksums,
760
                                all_vglist[node], all_nvinfo[node],
761
                                all_rversion[node], feedback_fn)
762
      bad = bad or result
763

    
764
      # node_volume
765
      volumeinfo = all_volumeinfo[node]
766

    
767
      if isinstance(volumeinfo, basestring):
768
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
769
                    (node, volumeinfo[-400:].encode('string_escape')))
770
        bad = True
771
        node_volume[node] = {}
772
      elif not isinstance(volumeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776
      else:
777
        node_volume[node] = volumeinfo
778

    
779
      # node_instance
780
      nodeinstance = all_instanceinfo[node]
781
      if type(nodeinstance) != list:
782
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
783
        bad = True
784
        continue
785

    
786
      node_instance[node] = nodeinstance
787

    
788
      # node_info
789
      nodeinfo = all_ninfo[node]
790
      if not isinstance(nodeinfo, dict):
791
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
792
        bad = True
793
        continue
794

    
795
      try:
796
        node_info[node] = {
797
          "mfree": int(nodeinfo['memory_free']),
798
          "dfree": int(nodeinfo['vg_free']),
799
          "pinst": [],
800
          "sinst": [],
801
          # dictionary holding all instances this node is secondary for,
802
          # grouped by their primary node. Each key is a cluster node, and each
803
          # value is a list of instances which have the key as primary and the
804
          # current node as secondary.  this is handy to calculate N+1 memory
805
          # availability if you can only failover from a primary to its
806
          # secondary.
807
          "sinst-by-pnode": {},
808
        }
809
      except ValueError:
810
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
811
        bad = True
812
        continue
813

    
814
    node_vol_should = {}
815

    
816
    for instance in instancelist:
817
      feedback_fn("* Verifying instance %s" % instance)
818
      inst_config = self.cfg.GetInstanceInfo(instance)
819
      result =  self._VerifyInstance(instance, inst_config, node_volume,
820
                                     node_instance, feedback_fn)
821
      bad = bad or result
822

    
823
      inst_config.MapLVsByNode(node_vol_should)
824

    
825
      instance_cfg[instance] = inst_config
826

    
827
      pnode = inst_config.primary_node
828
      if pnode in node_info:
829
        node_info[pnode]['pinst'].append(instance)
830
      else:
831
        feedback_fn("  - ERROR: instance %s, connection to primary node"
832
                    " %s failed" % (instance, pnode))
833
        bad = True
834

    
835
      # If the instance is non-redundant we cannot survive losing its primary
836
      # node, so we are not N+1 compliant. On the other hand we have no disk
837
      # templates with more than one secondary so that situation is not well
838
      # supported either.
839
      # FIXME: does not support file-backed instances
840
      if len(inst_config.secondary_nodes) == 0:
841
        i_non_redundant.append(instance)
842
      elif len(inst_config.secondary_nodes) > 1:
843
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
844
                    % instance)
845

    
846
      for snode in inst_config.secondary_nodes:
847
        if snode in node_info:
848
          node_info[snode]['sinst'].append(instance)
849
          if pnode not in node_info[snode]['sinst-by-pnode']:
850
            node_info[snode]['sinst-by-pnode'][pnode] = []
851
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
852
        else:
853
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
854
                      " %s failed" % (instance, snode))
855

    
856
    feedback_fn("* Verifying orphan volumes")
857
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
858
                                       feedback_fn)
859
    bad = bad or result
860

    
861
    feedback_fn("* Verifying remaining instances")
862
    result = self._VerifyOrphanInstances(instancelist, node_instance,
863
                                         feedback_fn)
864
    bad = bad or result
865

    
866
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
867
      feedback_fn("* Verifying N+1 Memory redundancy")
868
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
869
      bad = bad or result
870

    
871
    feedback_fn("* Other Notes")
872
    if i_non_redundant:
873
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
874
                  % len(i_non_redundant))
875

    
876
    return not bad
877

    
878
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
879
    """Analize the post-hooks' result, handle it, and send some
880
    nicely-formatted feedback back to the user.
881

882
    Args:
883
      phase: the hooks phase that has just been run
884
      hooks_results: the results of the multi-node hooks rpc call
885
      feedback_fn: function to send feedback back to the caller
886
      lu_result: previous Exec result
887

888
    """
889
    # We only really run POST phase hooks, and are only interested in
890
    # their results
891
    if phase == constants.HOOKS_PHASE_POST:
892
      # Used to change hooks' output to proper indentation
893
      indent_re = re.compile('^', re.M)
894
      feedback_fn("* Hooks Results")
895
      if not hooks_results:
896
        feedback_fn("  - ERROR: general communication failure")
897
        lu_result = 1
898
      else:
899
        for node_name in hooks_results:
900
          show_node_header = True
901
          res = hooks_results[node_name]
902
          if res is False or not isinstance(res, list):
903
            feedback_fn("    Communication failure")
904
            lu_result = 1
905
            continue
906
          for script, hkr, output in res:
907
            if hkr == constants.HKR_FAIL:
908
              # The node header is only shown once, if there are
909
              # failing hooks on that node
910
              if show_node_header:
911
                feedback_fn("  Node %s:" % node_name)
912
                show_node_header = False
913
              feedback_fn("    ERROR: Script %s failed, output:" % script)
914
              output = indent_re.sub('      ', output)
915
              feedback_fn("%s" % output)
916
              lu_result = 1
917

    
918
      return lu_result
919

    
920

    
921
class LUVerifyDisks(NoHooksLU):
922
  """Verifies the cluster disks status.
923

924
  """
925
  _OP_REQP = []
926
  REQ_BGL = False
927

    
928
  def ExpandNames(self):
929
    self.needed_locks = {
930
      locking.LEVEL_NODE: locking.ALL_SET,
931
      locking.LEVEL_INSTANCE: locking.ALL_SET,
932
    }
933
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
934

    
935
  def CheckPrereq(self):
936
    """Check prerequisites.
937

938
    This has no prerequisites.
939

940
    """
941
    pass
942

    
943
  def Exec(self, feedback_fn):
944
    """Verify integrity of cluster disks.
945

946
    """
947
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
948

    
949
    vg_name = self.cfg.GetVGName()
950
    nodes = utils.NiceSort(self.cfg.GetNodeList())
951
    instances = [self.cfg.GetInstanceInfo(name)
952
                 for name in self.cfg.GetInstanceList()]
953

    
954
    nv_dict = {}
955
    for inst in instances:
956
      inst_lvs = {}
957
      if (inst.status != "up" or
958
          inst.disk_template not in constants.DTS_NET_MIRROR):
959
        continue
960
      inst.MapLVsByNode(inst_lvs)
961
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
962
      for node, vol_list in inst_lvs.iteritems():
963
        for vol in vol_list:
964
          nv_dict[(node, vol)] = inst
965

    
966
    if not nv_dict:
967
      return result
968

    
969
    node_lvs = rpc.call_volume_list(nodes, vg_name)
970

    
971
    to_act = set()
972
    for node in nodes:
973
      # node_volume
974
      lvs = node_lvs[node]
975

    
976
      if isinstance(lvs, basestring):
977
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
978
        res_nlvm[node] = lvs
979
      elif not isinstance(lvs, dict):
980
        logger.Info("connection to node %s failed or invalid data returned" %
981
                    (node,))
982
        res_nodes.append(node)
983
        continue
984

    
985
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
986
        inst = nv_dict.pop((node, lv_name), None)
987
        if (not lv_online and inst is not None
988
            and inst.name not in res_instances):
989
          res_instances.append(inst.name)
990

    
991
    # any leftover items in nv_dict are missing LVs, let's arrange the
992
    # data better
993
    for key, inst in nv_dict.iteritems():
994
      if inst.name not in res_missing:
995
        res_missing[inst.name] = []
996
      res_missing[inst.name].append(key)
997

    
998
    return result
999

    
1000

    
1001
class LURenameCluster(LogicalUnit):
1002
  """Rename the cluster.
1003

1004
  """
1005
  HPATH = "cluster-rename"
1006
  HTYPE = constants.HTYPE_CLUSTER
1007
  _OP_REQP = ["name"]
1008
  REQ_WSSTORE = True
1009

    
1010
  def BuildHooksEnv(self):
1011
    """Build hooks env.
1012

1013
    """
1014
    env = {
1015
      "OP_TARGET": self.sstore.GetClusterName(),
1016
      "NEW_NAME": self.op.name,
1017
      }
1018
    mn = self.sstore.GetMasterNode()
1019
    return env, [mn], [mn]
1020

    
1021
  def CheckPrereq(self):
1022
    """Verify that the passed name is a valid one.
1023

1024
    """
1025
    hostname = utils.HostInfo(self.op.name)
1026

    
1027
    new_name = hostname.name
1028
    self.ip = new_ip = hostname.ip
1029
    old_name = self.sstore.GetClusterName()
1030
    old_ip = self.sstore.GetMasterIP()
1031
    if new_name == old_name and new_ip == old_ip:
1032
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1033
                                 " cluster has changed")
1034
    if new_ip != old_ip:
1035
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1036
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1037
                                   " reachable on the network. Aborting." %
1038
                                   new_ip)
1039

    
1040
    self.op.name = new_name
1041

    
1042
  def Exec(self, feedback_fn):
1043
    """Rename the cluster.
1044

1045
    """
1046
    clustername = self.op.name
1047
    ip = self.ip
1048
    ss = self.sstore
1049

    
1050
    # shutdown the master IP
1051
    master = ss.GetMasterNode()
1052
    if not rpc.call_node_stop_master(master, False):
1053
      raise errors.OpExecError("Could not disable the master role")
1054

    
1055
    try:
1056
      # modify the sstore
1057
      ss.SetKey(ss.SS_MASTER_IP, ip)
1058
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1059

    
1060
      # Distribute updated ss config to all nodes
1061
      myself = self.cfg.GetNodeInfo(master)
1062
      dist_nodes = self.cfg.GetNodeList()
1063
      if myself.name in dist_nodes:
1064
        dist_nodes.remove(myself.name)
1065

    
1066
      logger.Debug("Copying updated ssconf data to all nodes")
1067
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1068
        fname = ss.KeyToFilename(keyname)
1069
        result = rpc.call_upload_file(dist_nodes, fname)
1070
        for to_node in dist_nodes:
1071
          if not result[to_node]:
1072
            logger.Error("copy of file %s to node %s failed" %
1073
                         (fname, to_node))
1074
    finally:
1075
      if not rpc.call_node_start_master(master, False):
1076
        logger.Error("Could not re-enable the master role on the master,"
1077
                     " please restart manually.")
1078

    
1079

    
1080
def _RecursiveCheckIfLVMBased(disk):
1081
  """Check if the given disk or its children are lvm-based.
1082

1083
  Args:
1084
    disk: ganeti.objects.Disk object
1085

1086
  Returns:
1087
    boolean indicating whether a LD_LV dev_type was found or not
1088

1089
  """
1090
  if disk.children:
1091
    for chdisk in disk.children:
1092
      if _RecursiveCheckIfLVMBased(chdisk):
1093
        return True
1094
  return disk.dev_type == constants.LD_LV
1095

    
1096

    
1097
class LUSetClusterParams(LogicalUnit):
1098
  """Change the parameters of the cluster.
1099

1100
  """
1101
  HPATH = "cluster-modify"
1102
  HTYPE = constants.HTYPE_CLUSTER
1103
  _OP_REQP = []
1104
  REQ_BGL = False
1105

    
1106
  def ExpandNames(self):
1107
    # FIXME: in the future maybe other cluster params won't require checking on
1108
    # all nodes to be modified.
1109
    self.needed_locks = {
1110
      locking.LEVEL_NODE: locking.ALL_SET,
1111
    }
1112
    self.share_locks[locking.LEVEL_NODE] = 1
1113

    
1114
  def BuildHooksEnv(self):
1115
    """Build hooks env.
1116

1117
    """
1118
    env = {
1119
      "OP_TARGET": self.sstore.GetClusterName(),
1120
      "NEW_VG_NAME": self.op.vg_name,
1121
      }
1122
    mn = self.sstore.GetMasterNode()
1123
    return env, [mn], [mn]
1124

    
1125
  def CheckPrereq(self):
1126
    """Check prerequisites.
1127

1128
    This checks whether the given params don't conflict and
1129
    if the given volume group is valid.
1130

1131
    """
1132
    # FIXME: This only works because there is only one parameter that can be
1133
    # changed or removed.
1134
    if not self.op.vg_name:
1135
      instances = self.cfg.GetAllInstancesInfo().values()
1136
      for inst in instances:
1137
        for disk in inst.disks:
1138
          if _RecursiveCheckIfLVMBased(disk):
1139
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1140
                                       " lvm-based instances exist")
1141

    
1142
    # if vg_name not None, checks given volume group on all nodes
1143
    if self.op.vg_name:
1144
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1145
      vglist = rpc.call_vg_list(node_list)
1146
      for node in node_list:
1147
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1148
                                              constants.MIN_VG_SIZE)
1149
        if vgstatus:
1150
          raise errors.OpPrereqError("Error on node '%s': %s" %
1151
                                     (node, vgstatus))
1152

    
1153
  def Exec(self, feedback_fn):
1154
    """Change the parameters of the cluster.
1155

1156
    """
1157
    if self.op.vg_name != self.cfg.GetVGName():
1158
      self.cfg.SetVGName(self.op.vg_name)
1159
    else:
1160
      feedback_fn("Cluster LVM configuration already in desired"
1161
                  " state, not changing")
1162

    
1163

    
1164
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1165
  """Sleep and poll for an instance's disk to sync.
1166

1167
  """
1168
  if not instance.disks:
1169
    return True
1170

    
1171
  if not oneshot:
1172
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1173

    
1174
  node = instance.primary_node
1175

    
1176
  for dev in instance.disks:
1177
    cfgw.SetDiskID(dev, node)
1178

    
1179
  retries = 0
1180
  while True:
1181
    max_time = 0
1182
    done = True
1183
    cumul_degraded = False
1184
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1185
    if not rstats:
1186
      proc.LogWarning("Can't get any data from node %s" % node)
1187
      retries += 1
1188
      if retries >= 10:
1189
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1190
                                 " aborting." % node)
1191
      time.sleep(6)
1192
      continue
1193
    retries = 0
1194
    for i in range(len(rstats)):
1195
      mstat = rstats[i]
1196
      if mstat is None:
1197
        proc.LogWarning("Can't compute data for node %s/%s" %
1198
                        (node, instance.disks[i].iv_name))
1199
        continue
1200
      # we ignore the ldisk parameter
1201
      perc_done, est_time, is_degraded, _ = mstat
1202
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1203
      if perc_done is not None:
1204
        done = False
1205
        if est_time is not None:
1206
          rem_time = "%d estimated seconds remaining" % est_time
1207
          max_time = est_time
1208
        else:
1209
          rem_time = "no time estimate"
1210
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1211
                     (instance.disks[i].iv_name, perc_done, rem_time))
1212
    if done or oneshot:
1213
      break
1214

    
1215
    time.sleep(min(60, max_time))
1216

    
1217
  if done:
1218
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1219
  return not cumul_degraded
1220

    
1221

    
1222
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1223
  """Check that mirrors are not degraded.
1224

1225
  The ldisk parameter, if True, will change the test from the
1226
  is_degraded attribute (which represents overall non-ok status for
1227
  the device(s)) to the ldisk (representing the local storage status).
1228

1229
  """
1230
  cfgw.SetDiskID(dev, node)
1231
  if ldisk:
1232
    idx = 6
1233
  else:
1234
    idx = 5
1235

    
1236
  result = True
1237
  if on_primary or dev.AssembleOnSecondary():
1238
    rstats = rpc.call_blockdev_find(node, dev)
1239
    if not rstats:
1240
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1241
      result = False
1242
    else:
1243
      result = result and (not rstats[idx])
1244
  if dev.children:
1245
    for child in dev.children:
1246
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1247

    
1248
  return result
1249

    
1250

    
1251
class LUDiagnoseOS(NoHooksLU):
1252
  """Logical unit for OS diagnose/query.
1253

1254
  """
1255
  _OP_REQP = ["output_fields", "names"]
1256
  REQ_BGL = False
1257

    
1258
  def ExpandNames(self):
1259
    if self.op.names:
1260
      raise errors.OpPrereqError("Selective OS query not supported")
1261

    
1262
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1263
    _CheckOutputFields(static=[],
1264
                       dynamic=self.dynamic_fields,
1265
                       selected=self.op.output_fields)
1266

    
1267
    # Lock all nodes, in shared mode
1268
    self.needed_locks = {}
1269
    self.share_locks[locking.LEVEL_NODE] = 1
1270
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1271

    
1272
  def CheckPrereq(self):
1273
    """Check prerequisites.
1274

1275
    """
1276

    
1277
  @staticmethod
1278
  def _DiagnoseByOS(node_list, rlist):
1279
    """Remaps a per-node return list into an a per-os per-node dictionary
1280

1281
      Args:
1282
        node_list: a list with the names of all nodes
1283
        rlist: a map with node names as keys and OS objects as values
1284

1285
      Returns:
1286
        map: a map with osnames as keys and as value another map, with
1287
             nodes as
1288
             keys and list of OS objects as values
1289
             e.g. {"debian-etch": {"node1": [<object>,...],
1290
                                   "node2": [<object>,]}
1291
                  }
1292

1293
    """
1294
    all_os = {}
1295
    for node_name, nr in rlist.iteritems():
1296
      if not nr:
1297
        continue
1298
      for os_obj in nr:
1299
        if os_obj.name not in all_os:
1300
          # build a list of nodes for this os containing empty lists
1301
          # for each node in node_list
1302
          all_os[os_obj.name] = {}
1303
          for nname in node_list:
1304
            all_os[os_obj.name][nname] = []
1305
        all_os[os_obj.name][node_name].append(os_obj)
1306
    return all_os
1307

    
1308
  def Exec(self, feedback_fn):
1309
    """Compute the list of OSes.
1310

1311
    """
1312
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1313
    node_data = rpc.call_os_diagnose(node_list)
1314
    if node_data == False:
1315
      raise errors.OpExecError("Can't gather the list of OSes")
1316
    pol = self._DiagnoseByOS(node_list, node_data)
1317
    output = []
1318
    for os_name, os_data in pol.iteritems():
1319
      row = []
1320
      for field in self.op.output_fields:
1321
        if field == "name":
1322
          val = os_name
1323
        elif field == "valid":
1324
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1325
        elif field == "node_status":
1326
          val = {}
1327
          for node_name, nos_list in os_data.iteritems():
1328
            val[node_name] = [(v.status, v.path) for v in nos_list]
1329
        else:
1330
          raise errors.ParameterError(field)
1331
        row.append(val)
1332
      output.append(row)
1333

    
1334
    return output
1335

    
1336

    
1337
class LURemoveNode(LogicalUnit):
1338
  """Logical unit for removing a node.
1339

1340
  """
1341
  HPATH = "node-remove"
1342
  HTYPE = constants.HTYPE_NODE
1343
  _OP_REQP = ["node_name"]
1344

    
1345
  def BuildHooksEnv(self):
1346
    """Build hooks env.
1347

1348
    This doesn't run on the target node in the pre phase as a failed
1349
    node would then be impossible to remove.
1350

1351
    """
1352
    env = {
1353
      "OP_TARGET": self.op.node_name,
1354
      "NODE_NAME": self.op.node_name,
1355
      }
1356
    all_nodes = self.cfg.GetNodeList()
1357
    all_nodes.remove(self.op.node_name)
1358
    return env, all_nodes, all_nodes
1359

    
1360
  def CheckPrereq(self):
1361
    """Check prerequisites.
1362

1363
    This checks:
1364
     - the node exists in the configuration
1365
     - it does not have primary or secondary instances
1366
     - it's not the master
1367

1368
    Any errors are signalled by raising errors.OpPrereqError.
1369

1370
    """
1371
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1372
    if node is None:
1373
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1374

    
1375
    instance_list = self.cfg.GetInstanceList()
1376

    
1377
    masternode = self.sstore.GetMasterNode()
1378
    if node.name == masternode:
1379
      raise errors.OpPrereqError("Node is the master node,"
1380
                                 " you need to failover first.")
1381

    
1382
    for instance_name in instance_list:
1383
      instance = self.cfg.GetInstanceInfo(instance_name)
1384
      if node.name == instance.primary_node:
1385
        raise errors.OpPrereqError("Instance %s still running on the node,"
1386
                                   " please remove first." % instance_name)
1387
      if node.name in instance.secondary_nodes:
1388
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1389
                                   " please remove first." % instance_name)
1390
    self.op.node_name = node.name
1391
    self.node = node
1392

    
1393
  def Exec(self, feedback_fn):
1394
    """Removes the node from the cluster.
1395

1396
    """
1397
    node = self.node
1398
    logger.Info("stopping the node daemon and removing configs from node %s" %
1399
                node.name)
1400

    
1401
    self.context.RemoveNode(node.name)
1402

    
1403
    rpc.call_node_leave_cluster(node.name)
1404

    
1405

    
1406
class LUQueryNodes(NoHooksLU):
1407
  """Logical unit for querying nodes.
1408

1409
  """
1410
  _OP_REQP = ["output_fields", "names"]
1411
  REQ_BGL = False
1412

    
1413
  def ExpandNames(self):
1414
    self.dynamic_fields = frozenset([
1415
      "dtotal", "dfree",
1416
      "mtotal", "mnode", "mfree",
1417
      "bootid",
1418
      "ctotal",
1419
      ])
1420

    
1421
    self.static_fields = frozenset([
1422
      "name", "pinst_cnt", "sinst_cnt",
1423
      "pinst_list", "sinst_list",
1424
      "pip", "sip", "tags",
1425
      ])
1426

    
1427
    _CheckOutputFields(static=self.static_fields,
1428
                       dynamic=self.dynamic_fields,
1429
                       selected=self.op.output_fields)
1430

    
1431
    self.needed_locks = {}
1432
    self.share_locks[locking.LEVEL_NODE] = 1
1433

    
1434
    if self.op.names:
1435
      self.wanted = _GetWantedNodes(self, self.op.names)
1436
    else:
1437
      self.wanted = locking.ALL_SET
1438

    
1439
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1440
    if self.do_locking:
1441
      # if we don't request only static fields, we need to lock the nodes
1442
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1443

    
1444

    
1445
  def CheckPrereq(self):
1446
    """Check prerequisites.
1447

1448
    """
1449
    # The validation of the node list is done in the _GetWantedNodes,
1450
    # if non empty, and if empty, there's no validation to do
1451
    pass
1452

    
1453
  def Exec(self, feedback_fn):
1454
    """Computes the list of nodes and their attributes.
1455

1456
    """
1457
    all_info = self.cfg.GetAllNodesInfo()
1458
    if self.do_locking:
1459
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1460
    elif self.wanted != locking.ALL_SET:
1461
      nodenames = self.wanted
1462
      missing = set(nodenames).difference(all_info.keys())
1463
      if missing:
1464
        raise self.OpExecError(
1465
          "Some nodes were removed before retrieving their data: %s" % missing)
1466
    else:
1467
      nodenames = all_info.keys()
1468
    nodelist = [all_info[name] for name in nodenames]
1469

    
1470
    # begin data gathering
1471

    
1472
    if self.dynamic_fields.intersection(self.op.output_fields):
1473
      live_data = {}
1474
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1475
      for name in nodenames:
1476
        nodeinfo = node_data.get(name, None)
1477
        if nodeinfo:
1478
          live_data[name] = {
1479
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1480
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1481
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1482
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1483
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1484
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1485
            "bootid": nodeinfo['bootid'],
1486
            }
1487
        else:
1488
          live_data[name] = {}
1489
    else:
1490
      live_data = dict.fromkeys(nodenames, {})
1491

    
1492
    node_to_primary = dict([(name, set()) for name in nodenames])
1493
    node_to_secondary = dict([(name, set()) for name in nodenames])
1494

    
1495
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1496
                             "sinst_cnt", "sinst_list"))
1497
    if inst_fields & frozenset(self.op.output_fields):
1498
      instancelist = self.cfg.GetInstanceList()
1499

    
1500
      for instance_name in instancelist:
1501
        inst = self.cfg.GetInstanceInfo(instance_name)
1502
        if inst.primary_node in node_to_primary:
1503
          node_to_primary[inst.primary_node].add(inst.name)
1504
        for secnode in inst.secondary_nodes:
1505
          if secnode in node_to_secondary:
1506
            node_to_secondary[secnode].add(inst.name)
1507

    
1508
    # end data gathering
1509

    
1510
    output = []
1511
    for node in nodelist:
1512
      node_output = []
1513
      for field in self.op.output_fields:
1514
        if field == "name":
1515
          val = node.name
1516
        elif field == "pinst_list":
1517
          val = list(node_to_primary[node.name])
1518
        elif field == "sinst_list":
1519
          val = list(node_to_secondary[node.name])
1520
        elif field == "pinst_cnt":
1521
          val = len(node_to_primary[node.name])
1522
        elif field == "sinst_cnt":
1523
          val = len(node_to_secondary[node.name])
1524
        elif field == "pip":
1525
          val = node.primary_ip
1526
        elif field == "sip":
1527
          val = node.secondary_ip
1528
        elif field == "tags":
1529
          val = list(node.GetTags())
1530
        elif field in self.dynamic_fields:
1531
          val = live_data[node.name].get(field, None)
1532
        else:
1533
          raise errors.ParameterError(field)
1534
        node_output.append(val)
1535
      output.append(node_output)
1536

    
1537
    return output
1538

    
1539

    
1540
class LUQueryNodeVolumes(NoHooksLU):
1541
  """Logical unit for getting volumes on node(s).
1542

1543
  """
1544
  _OP_REQP = ["nodes", "output_fields"]
1545
  REQ_BGL = False
1546

    
1547
  def ExpandNames(self):
1548
    _CheckOutputFields(static=["node"],
1549
                       dynamic=["phys", "vg", "name", "size", "instance"],
1550
                       selected=self.op.output_fields)
1551

    
1552
    self.needed_locks = {}
1553
    self.share_locks[locking.LEVEL_NODE] = 1
1554
    if not self.op.nodes:
1555
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1556
    else:
1557
      self.needed_locks[locking.LEVEL_NODE] = \
1558
        _GetWantedNodes(self, self.op.nodes)
1559

    
1560
  def CheckPrereq(self):
1561
    """Check prerequisites.
1562

1563
    This checks that the fields required are valid output fields.
1564

1565
    """
1566
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1567

    
1568
  def Exec(self, feedback_fn):
1569
    """Computes the list of nodes and their attributes.
1570

1571
    """
1572
    nodenames = self.nodes
1573
    volumes = rpc.call_node_volumes(nodenames)
1574

    
1575
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1576
             in self.cfg.GetInstanceList()]
1577

    
1578
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1579

    
1580
    output = []
1581
    for node in nodenames:
1582
      if node not in volumes or not volumes[node]:
1583
        continue
1584

    
1585
      node_vols = volumes[node][:]
1586
      node_vols.sort(key=lambda vol: vol['dev'])
1587

    
1588
      for vol in node_vols:
1589
        node_output = []
1590
        for field in self.op.output_fields:
1591
          if field == "node":
1592
            val = node
1593
          elif field == "phys":
1594
            val = vol['dev']
1595
          elif field == "vg":
1596
            val = vol['vg']
1597
          elif field == "name":
1598
            val = vol['name']
1599
          elif field == "size":
1600
            val = int(float(vol['size']))
1601
          elif field == "instance":
1602
            for inst in ilist:
1603
              if node not in lv_by_node[inst]:
1604
                continue
1605
              if vol['name'] in lv_by_node[inst][node]:
1606
                val = inst.name
1607
                break
1608
            else:
1609
              val = '-'
1610
          else:
1611
            raise errors.ParameterError(field)
1612
          node_output.append(str(val))
1613

    
1614
        output.append(node_output)
1615

    
1616
    return output
1617

    
1618

    
1619
class LUAddNode(LogicalUnit):
1620
  """Logical unit for adding node to the cluster.
1621

1622
  """
1623
  HPATH = "node-add"
1624
  HTYPE = constants.HTYPE_NODE
1625
  _OP_REQP = ["node_name"]
1626

    
1627
  def BuildHooksEnv(self):
1628
    """Build hooks env.
1629

1630
    This will run on all nodes before, and on all nodes + the new node after.
1631

1632
    """
1633
    env = {
1634
      "OP_TARGET": self.op.node_name,
1635
      "NODE_NAME": self.op.node_name,
1636
      "NODE_PIP": self.op.primary_ip,
1637
      "NODE_SIP": self.op.secondary_ip,
1638
      }
1639
    nodes_0 = self.cfg.GetNodeList()
1640
    nodes_1 = nodes_0 + [self.op.node_name, ]
1641
    return env, nodes_0, nodes_1
1642

    
1643
  def CheckPrereq(self):
1644
    """Check prerequisites.
1645

1646
    This checks:
1647
     - the new node is not already in the config
1648
     - it is resolvable
1649
     - its parameters (single/dual homed) matches the cluster
1650

1651
    Any errors are signalled by raising errors.OpPrereqError.
1652

1653
    """
1654
    node_name = self.op.node_name
1655
    cfg = self.cfg
1656

    
1657
    dns_data = utils.HostInfo(node_name)
1658

    
1659
    node = dns_data.name
1660
    primary_ip = self.op.primary_ip = dns_data.ip
1661
    secondary_ip = getattr(self.op, "secondary_ip", None)
1662
    if secondary_ip is None:
1663
      secondary_ip = primary_ip
1664
    if not utils.IsValidIP(secondary_ip):
1665
      raise errors.OpPrereqError("Invalid secondary IP given")
1666
    self.op.secondary_ip = secondary_ip
1667

    
1668
    node_list = cfg.GetNodeList()
1669
    if not self.op.readd and node in node_list:
1670
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1671
                                 node)
1672
    elif self.op.readd and node not in node_list:
1673
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1674

    
1675
    for existing_node_name in node_list:
1676
      existing_node = cfg.GetNodeInfo(existing_node_name)
1677

    
1678
      if self.op.readd and node == existing_node_name:
1679
        if (existing_node.primary_ip != primary_ip or
1680
            existing_node.secondary_ip != secondary_ip):
1681
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1682
                                     " address configuration as before")
1683
        continue
1684

    
1685
      if (existing_node.primary_ip == primary_ip or
1686
          existing_node.secondary_ip == primary_ip or
1687
          existing_node.primary_ip == secondary_ip or
1688
          existing_node.secondary_ip == secondary_ip):
1689
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1690
                                   " existing node %s" % existing_node.name)
1691

    
1692
    # check that the type of the node (single versus dual homed) is the
1693
    # same as for the master
1694
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1695
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1696
    newbie_singlehomed = secondary_ip == primary_ip
1697
    if master_singlehomed != newbie_singlehomed:
1698
      if master_singlehomed:
1699
        raise errors.OpPrereqError("The master has no private ip but the"
1700
                                   " new node has one")
1701
      else:
1702
        raise errors.OpPrereqError("The master has a private ip but the"
1703
                                   " new node doesn't have one")
1704

    
1705
    # checks reachablity
1706
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1707
      raise errors.OpPrereqError("Node not reachable by ping")
1708

    
1709
    if not newbie_singlehomed:
1710
      # check reachability from my secondary ip to newbie's secondary ip
1711
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1712
                           source=myself.secondary_ip):
1713
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1714
                                   " based ping to noded port")
1715

    
1716
    self.new_node = objects.Node(name=node,
1717
                                 primary_ip=primary_ip,
1718
                                 secondary_ip=secondary_ip)
1719

    
1720
  def Exec(self, feedback_fn):
1721
    """Adds the new node to the cluster.
1722

1723
    """
1724
    new_node = self.new_node
1725
    node = new_node.name
1726

    
1727
    # check connectivity
1728
    result = rpc.call_version([node])[node]
1729
    if result:
1730
      if constants.PROTOCOL_VERSION == result:
1731
        logger.Info("communication to node %s fine, sw version %s match" %
1732
                    (node, result))
1733
      else:
1734
        raise errors.OpExecError("Version mismatch master version %s,"
1735
                                 " node version %s" %
1736
                                 (constants.PROTOCOL_VERSION, result))
1737
    else:
1738
      raise errors.OpExecError("Cannot get version from the new node")
1739

    
1740
    # setup ssh on node
1741
    logger.Info("copy ssh key to node %s" % node)
1742
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1743
    keyarray = []
1744
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1745
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1746
                priv_key, pub_key]
1747

    
1748
    for i in keyfiles:
1749
      f = open(i, 'r')
1750
      try:
1751
        keyarray.append(f.read())
1752
      finally:
1753
        f.close()
1754

    
1755
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1756
                               keyarray[3], keyarray[4], keyarray[5])
1757

    
1758
    if not result:
1759
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1760

    
1761
    # Add node to our /etc/hosts, and add key to known_hosts
1762
    utils.AddHostToEtcHosts(new_node.name)
1763

    
1764
    if new_node.secondary_ip != new_node.primary_ip:
1765
      if not rpc.call_node_tcp_ping(new_node.name,
1766
                                    constants.LOCALHOST_IP_ADDRESS,
1767
                                    new_node.secondary_ip,
1768
                                    constants.DEFAULT_NODED_PORT,
1769
                                    10, False):
1770
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1771
                                 " you gave (%s). Please fix and re-run this"
1772
                                 " command." % new_node.secondary_ip)
1773

    
1774
    node_verify_list = [self.sstore.GetMasterNode()]
1775
    node_verify_param = {
1776
      'nodelist': [node],
1777
      # TODO: do a node-net-test as well?
1778
    }
1779

    
1780
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1781
    for verifier in node_verify_list:
1782
      if not result[verifier]:
1783
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1784
                                 " for remote verification" % verifier)
1785
      if result[verifier]['nodelist']:
1786
        for failed in result[verifier]['nodelist']:
1787
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1788
                      (verifier, result[verifier]['nodelist'][failed]))
1789
        raise errors.OpExecError("ssh/hostname verification failed.")
1790

    
1791
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1792
    # including the node just added
1793
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1794
    dist_nodes = self.cfg.GetNodeList()
1795
    if not self.op.readd:
1796
      dist_nodes.append(node)
1797
    if myself.name in dist_nodes:
1798
      dist_nodes.remove(myself.name)
1799

    
1800
    logger.Debug("Copying hosts and known_hosts to all nodes")
1801
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1802
      result = rpc.call_upload_file(dist_nodes, fname)
1803
      for to_node in dist_nodes:
1804
        if not result[to_node]:
1805
          logger.Error("copy of file %s to node %s failed" %
1806
                       (fname, to_node))
1807

    
1808
    to_copy = self.sstore.GetFileList()
1809
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1810
      to_copy.append(constants.VNC_PASSWORD_FILE)
1811
    for fname in to_copy:
1812
      result = rpc.call_upload_file([node], fname)
1813
      if not result[node]:
1814
        logger.Error("could not copy file %s to node %s" % (fname, node))
1815

    
1816
    if self.op.readd:
1817
      self.context.ReaddNode(new_node)
1818
    else:
1819
      self.context.AddNode(new_node)
1820

    
1821

    
1822
class LUQueryClusterInfo(NoHooksLU):
1823
  """Query cluster configuration.
1824

1825
  """
1826
  _OP_REQP = []
1827
  REQ_MASTER = False
1828
  REQ_BGL = False
1829

    
1830
  def ExpandNames(self):
1831
    self.needed_locks = {}
1832

    
1833
  def CheckPrereq(self):
1834
    """No prerequsites needed for this LU.
1835

1836
    """
1837
    pass
1838

    
1839
  def Exec(self, feedback_fn):
1840
    """Return cluster config.
1841

1842
    """
1843
    result = {
1844
      "name": self.sstore.GetClusterName(),
1845
      "software_version": constants.RELEASE_VERSION,
1846
      "protocol_version": constants.PROTOCOL_VERSION,
1847
      "config_version": constants.CONFIG_VERSION,
1848
      "os_api_version": constants.OS_API_VERSION,
1849
      "export_version": constants.EXPORT_VERSION,
1850
      "master": self.sstore.GetMasterNode(),
1851
      "architecture": (platform.architecture()[0], platform.machine()),
1852
      "hypervisor_type": self.sstore.GetHypervisorType(),
1853
      }
1854

    
1855
    return result
1856

    
1857

    
1858
class LUDumpClusterConfig(NoHooksLU):
1859
  """Return a text-representation of the cluster-config.
1860

1861
  """
1862
  _OP_REQP = []
1863
  REQ_BGL = False
1864

    
1865
  def ExpandNames(self):
1866
    self.needed_locks = {}
1867

    
1868
  def CheckPrereq(self):
1869
    """No prerequisites.
1870

1871
    """
1872
    pass
1873

    
1874
  def Exec(self, feedback_fn):
1875
    """Dump a representation of the cluster config to the standard output.
1876

1877
    """
1878
    return self.cfg.DumpConfig()
1879

    
1880

    
1881
class LUActivateInstanceDisks(NoHooksLU):
1882
  """Bring up an instance's disks.
1883

1884
  """
1885
  _OP_REQP = ["instance_name"]
1886
  REQ_BGL = False
1887

    
1888
  def ExpandNames(self):
1889
    self._ExpandAndLockInstance()
1890
    self.needed_locks[locking.LEVEL_NODE] = []
1891
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1892

    
1893
  def DeclareLocks(self, level):
1894
    if level == locking.LEVEL_NODE:
1895
      self._LockInstancesNodes()
1896

    
1897
  def CheckPrereq(self):
1898
    """Check prerequisites.
1899

1900
    This checks that the instance is in the cluster.
1901

1902
    """
1903
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1904
    assert self.instance is not None, \
1905
      "Cannot retrieve locked instance %s" % self.op.instance_name
1906

    
1907
  def Exec(self, feedback_fn):
1908
    """Activate the disks.
1909

1910
    """
1911
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1912
    if not disks_ok:
1913
      raise errors.OpExecError("Cannot activate block devices")
1914

    
1915
    return disks_info
1916

    
1917

    
1918
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1919
  """Prepare the block devices for an instance.
1920

1921
  This sets up the block devices on all nodes.
1922

1923
  Args:
1924
    instance: a ganeti.objects.Instance object
1925
    ignore_secondaries: if true, errors on secondary nodes won't result
1926
                        in an error return from the function
1927

1928
  Returns:
1929
    false if the operation failed
1930
    list of (host, instance_visible_name, node_visible_name) if the operation
1931
         suceeded with the mapping from node devices to instance devices
1932
  """
1933
  device_info = []
1934
  disks_ok = True
1935
  iname = instance.name
1936
  # With the two passes mechanism we try to reduce the window of
1937
  # opportunity for the race condition of switching DRBD to primary
1938
  # before handshaking occured, but we do not eliminate it
1939

    
1940
  # The proper fix would be to wait (with some limits) until the
1941
  # connection has been made and drbd transitions from WFConnection
1942
  # into any other network-connected state (Connected, SyncTarget,
1943
  # SyncSource, etc.)
1944

    
1945
  # 1st pass, assemble on all nodes in secondary mode
1946
  for inst_disk in instance.disks:
1947
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1948
      cfg.SetDiskID(node_disk, node)
1949
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1950
      if not result:
1951
        logger.Error("could not prepare block device %s on node %s"
1952
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1953
        if not ignore_secondaries:
1954
          disks_ok = False
1955

    
1956
  # FIXME: race condition on drbd migration to primary
1957

    
1958
  # 2nd pass, do only the primary node
1959
  for inst_disk in instance.disks:
1960
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1961
      if node != instance.primary_node:
1962
        continue
1963
      cfg.SetDiskID(node_disk, node)
1964
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1965
      if not result:
1966
        logger.Error("could not prepare block device %s on node %s"
1967
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1968
        disks_ok = False
1969
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1970

    
1971
  # leave the disks configured for the primary node
1972
  # this is a workaround that would be fixed better by
1973
  # improving the logical/physical id handling
1974
  for disk in instance.disks:
1975
    cfg.SetDiskID(disk, instance.primary_node)
1976

    
1977
  return disks_ok, device_info
1978

    
1979

    
1980
def _StartInstanceDisks(cfg, instance, force):
1981
  """Start the disks of an instance.
1982

1983
  """
1984
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1985
                                           ignore_secondaries=force)
1986
  if not disks_ok:
1987
    _ShutdownInstanceDisks(instance, cfg)
1988
    if force is not None and not force:
1989
      logger.Error("If the message above refers to a secondary node,"
1990
                   " you can retry the operation using '--force'.")
1991
    raise errors.OpExecError("Disk consistency error")
1992

    
1993

    
1994
class LUDeactivateInstanceDisks(NoHooksLU):
1995
  """Shutdown an instance's disks.
1996

1997
  """
1998
  _OP_REQP = ["instance_name"]
1999
  REQ_BGL = False
2000

    
2001
  def ExpandNames(self):
2002
    self._ExpandAndLockInstance()
2003
    self.needed_locks[locking.LEVEL_NODE] = []
2004
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2005

    
2006
  def DeclareLocks(self, level):
2007
    if level == locking.LEVEL_NODE:
2008
      self._LockInstancesNodes()
2009

    
2010
  def CheckPrereq(self):
2011
    """Check prerequisites.
2012

2013
    This checks that the instance is in the cluster.
2014

2015
    """
2016
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2017
    assert self.instance is not None, \
2018
      "Cannot retrieve locked instance %s" % self.op.instance_name
2019

    
2020
  def Exec(self, feedback_fn):
2021
    """Deactivate the disks
2022

2023
    """
2024
    instance = self.instance
2025
    _SafeShutdownInstanceDisks(instance, self.cfg)
2026

    
2027

    
2028
def _SafeShutdownInstanceDisks(instance, cfg):
2029
  """Shutdown block devices of an instance.
2030

2031
  This function checks if an instance is running, before calling
2032
  _ShutdownInstanceDisks.
2033

2034
  """
2035
  ins_l = rpc.call_instance_list([instance.primary_node])
2036
  ins_l = ins_l[instance.primary_node]
2037
  if not type(ins_l) is list:
2038
    raise errors.OpExecError("Can't contact node '%s'" %
2039
                             instance.primary_node)
2040

    
2041
  if instance.name in ins_l:
2042
    raise errors.OpExecError("Instance is running, can't shutdown"
2043
                             " block devices.")
2044

    
2045
  _ShutdownInstanceDisks(instance, cfg)
2046

    
2047

    
2048
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2049
  """Shutdown block devices of an instance.
2050

2051
  This does the shutdown on all nodes of the instance.
2052

2053
  If the ignore_primary is false, errors on the primary node are
2054
  ignored.
2055

2056
  """
2057
  result = True
2058
  for disk in instance.disks:
2059
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2060
      cfg.SetDiskID(top_disk, node)
2061
      if not rpc.call_blockdev_shutdown(node, top_disk):
2062
        logger.Error("could not shutdown block device %s on node %s" %
2063
                     (disk.iv_name, node))
2064
        if not ignore_primary or node != instance.primary_node:
2065
          result = False
2066
  return result
2067

    
2068

    
2069
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2070
  """Checks if a node has enough free memory.
2071

2072
  This function check if a given node has the needed amount of free
2073
  memory. In case the node has less memory or we cannot get the
2074
  information from the node, this function raise an OpPrereqError
2075
  exception.
2076

2077
  Args:
2078
    - cfg: a ConfigWriter instance
2079
    - node: the node name
2080
    - reason: string to use in the error message
2081
    - requested: the amount of memory in MiB
2082

2083
  """
2084
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2085
  if not nodeinfo or not isinstance(nodeinfo, dict):
2086
    raise errors.OpPrereqError("Could not contact node %s for resource"
2087
                             " information" % (node,))
2088

    
2089
  free_mem = nodeinfo[node].get('memory_free')
2090
  if not isinstance(free_mem, int):
2091
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2092
                             " was '%s'" % (node, free_mem))
2093
  if requested > free_mem:
2094
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2095
                             " needed %s MiB, available %s MiB" %
2096
                             (node, reason, requested, free_mem))
2097

    
2098

    
2099
class LUStartupInstance(LogicalUnit):
2100
  """Starts an instance.
2101

2102
  """
2103
  HPATH = "instance-start"
2104
  HTYPE = constants.HTYPE_INSTANCE
2105
  _OP_REQP = ["instance_name", "force"]
2106
  REQ_BGL = False
2107

    
2108
  def ExpandNames(self):
2109
    self._ExpandAndLockInstance()
2110
    self.needed_locks[locking.LEVEL_NODE] = []
2111
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2112

    
2113
  def DeclareLocks(self, level):
2114
    if level == locking.LEVEL_NODE:
2115
      self._LockInstancesNodes()
2116

    
2117
  def BuildHooksEnv(self):
2118
    """Build hooks env.
2119

2120
    This runs on master, primary and secondary nodes of the instance.
2121

2122
    """
2123
    env = {
2124
      "FORCE": self.op.force,
2125
      }
2126
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2127
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2128
          list(self.instance.secondary_nodes))
2129
    return env, nl, nl
2130

    
2131
  def CheckPrereq(self):
2132
    """Check prerequisites.
2133

2134
    This checks that the instance is in the cluster.
2135

2136
    """
2137
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2138
    assert self.instance is not None, \
2139
      "Cannot retrieve locked instance %s" % self.op.instance_name
2140

    
2141
    # check bridges existance
2142
    _CheckInstanceBridgesExist(instance)
2143

    
2144
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2145
                         "starting instance %s" % instance.name,
2146
                         instance.memory)
2147

    
2148
  def Exec(self, feedback_fn):
2149
    """Start the instance.
2150

2151
    """
2152
    instance = self.instance
2153
    force = self.op.force
2154
    extra_args = getattr(self.op, "extra_args", "")
2155

    
2156
    self.cfg.MarkInstanceUp(instance.name)
2157

    
2158
    node_current = instance.primary_node
2159

    
2160
    _StartInstanceDisks(self.cfg, instance, force)
2161

    
2162
    if not rpc.call_instance_start(node_current, instance, extra_args):
2163
      _ShutdownInstanceDisks(instance, self.cfg)
2164
      raise errors.OpExecError("Could not start instance")
2165

    
2166

    
2167
class LURebootInstance(LogicalUnit):
2168
  """Reboot an instance.
2169

2170
  """
2171
  HPATH = "instance-reboot"
2172
  HTYPE = constants.HTYPE_INSTANCE
2173
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2174
  REQ_BGL = False
2175

    
2176
  def ExpandNames(self):
2177
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2178
                                   constants.INSTANCE_REBOOT_HARD,
2179
                                   constants.INSTANCE_REBOOT_FULL]:
2180
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2181
                                  (constants.INSTANCE_REBOOT_SOFT,
2182
                                   constants.INSTANCE_REBOOT_HARD,
2183
                                   constants.INSTANCE_REBOOT_FULL))
2184
    self._ExpandAndLockInstance()
2185
    self.needed_locks[locking.LEVEL_NODE] = []
2186
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2187

    
2188
  def DeclareLocks(self, level):
2189
    if level == locking.LEVEL_NODE:
2190
      primary_only = not constants.INSTANCE_REBOOT_FULL
2191
      self._LockInstancesNodes(primary_only=primary_only)
2192

    
2193
  def BuildHooksEnv(self):
2194
    """Build hooks env.
2195

2196
    This runs on master, primary and secondary nodes of the instance.
2197

2198
    """
2199
    env = {
2200
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2201
      }
2202
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2203
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2204
          list(self.instance.secondary_nodes))
2205
    return env, nl, nl
2206

    
2207
  def CheckPrereq(self):
2208
    """Check prerequisites.
2209

2210
    This checks that the instance is in the cluster.
2211

2212
    """
2213
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2214
    assert self.instance is not None, \
2215
      "Cannot retrieve locked instance %s" % self.op.instance_name
2216

    
2217
    # check bridges existance
2218
    _CheckInstanceBridgesExist(instance)
2219

    
2220
  def Exec(self, feedback_fn):
2221
    """Reboot the instance.
2222

2223
    """
2224
    instance = self.instance
2225
    ignore_secondaries = self.op.ignore_secondaries
2226
    reboot_type = self.op.reboot_type
2227
    extra_args = getattr(self.op, "extra_args", "")
2228

    
2229
    node_current = instance.primary_node
2230

    
2231
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2232
                       constants.INSTANCE_REBOOT_HARD]:
2233
      if not rpc.call_instance_reboot(node_current, instance,
2234
                                      reboot_type, extra_args):
2235
        raise errors.OpExecError("Could not reboot instance")
2236
    else:
2237
      if not rpc.call_instance_shutdown(node_current, instance):
2238
        raise errors.OpExecError("could not shutdown instance for full reboot")
2239
      _ShutdownInstanceDisks(instance, self.cfg)
2240
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2241
      if not rpc.call_instance_start(node_current, instance, extra_args):
2242
        _ShutdownInstanceDisks(instance, self.cfg)
2243
        raise errors.OpExecError("Could not start instance for full reboot")
2244

    
2245
    self.cfg.MarkInstanceUp(instance.name)
2246

    
2247

    
2248
class LUShutdownInstance(LogicalUnit):
2249
  """Shutdown an instance.
2250

2251
  """
2252
  HPATH = "instance-stop"
2253
  HTYPE = constants.HTYPE_INSTANCE
2254
  _OP_REQP = ["instance_name"]
2255
  REQ_BGL = False
2256

    
2257
  def ExpandNames(self):
2258
    self._ExpandAndLockInstance()
2259
    self.needed_locks[locking.LEVEL_NODE] = []
2260
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2261

    
2262
  def DeclareLocks(self, level):
2263
    if level == locking.LEVEL_NODE:
2264
      self._LockInstancesNodes()
2265

    
2266
  def BuildHooksEnv(self):
2267
    """Build hooks env.
2268

2269
    This runs on master, primary and secondary nodes of the instance.
2270

2271
    """
2272
    env = _BuildInstanceHookEnvByObject(self.instance)
2273
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2274
          list(self.instance.secondary_nodes))
2275
    return env, nl, nl
2276

    
2277
  def CheckPrereq(self):
2278
    """Check prerequisites.
2279

2280
    This checks that the instance is in the cluster.
2281

2282
    """
2283
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2284
    assert self.instance is not None, \
2285
      "Cannot retrieve locked instance %s" % self.op.instance_name
2286

    
2287
  def Exec(self, feedback_fn):
2288
    """Shutdown the instance.
2289

2290
    """
2291
    instance = self.instance
2292
    node_current = instance.primary_node
2293
    self.cfg.MarkInstanceDown(instance.name)
2294
    if not rpc.call_instance_shutdown(node_current, instance):
2295
      logger.Error("could not shutdown instance")
2296

    
2297
    _ShutdownInstanceDisks(instance, self.cfg)
2298

    
2299

    
2300
class LUReinstallInstance(LogicalUnit):
2301
  """Reinstall an instance.
2302

2303
  """
2304
  HPATH = "instance-reinstall"
2305
  HTYPE = constants.HTYPE_INSTANCE
2306
  _OP_REQP = ["instance_name"]
2307
  REQ_BGL = False
2308

    
2309
  def ExpandNames(self):
2310
    self._ExpandAndLockInstance()
2311
    self.needed_locks[locking.LEVEL_NODE] = []
2312
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2313

    
2314
  def DeclareLocks(self, level):
2315
    if level == locking.LEVEL_NODE:
2316
      self._LockInstancesNodes()
2317

    
2318
  def BuildHooksEnv(self):
2319
    """Build hooks env.
2320

2321
    This runs on master, primary and secondary nodes of the instance.
2322

2323
    """
2324
    env = _BuildInstanceHookEnvByObject(self.instance)
2325
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2326
          list(self.instance.secondary_nodes))
2327
    return env, nl, nl
2328

    
2329
  def CheckPrereq(self):
2330
    """Check prerequisites.
2331

2332
    This checks that the instance is in the cluster and is not running.
2333

2334
    """
2335
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2336
    assert instance is not None, \
2337
      "Cannot retrieve locked instance %s" % self.op.instance_name
2338

    
2339
    if instance.disk_template == constants.DT_DISKLESS:
2340
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2341
                                 self.op.instance_name)
2342
    if instance.status != "down":
2343
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2344
                                 self.op.instance_name)
2345
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2346
    if remote_info:
2347
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2348
                                 (self.op.instance_name,
2349
                                  instance.primary_node))
2350

    
2351
    self.op.os_type = getattr(self.op, "os_type", None)
2352
    if self.op.os_type is not None:
2353
      # OS verification
2354
      pnode = self.cfg.GetNodeInfo(
2355
        self.cfg.ExpandNodeName(instance.primary_node))
2356
      if pnode is None:
2357
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2358
                                   self.op.pnode)
2359
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2360
      if not os_obj:
2361
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2362
                                   " primary node"  % self.op.os_type)
2363

    
2364
    self.instance = instance
2365

    
2366
  def Exec(self, feedback_fn):
2367
    """Reinstall the instance.
2368

2369
    """
2370
    inst = self.instance
2371

    
2372
    if self.op.os_type is not None:
2373
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2374
      inst.os = self.op.os_type
2375
      self.cfg.AddInstance(inst)
2376

    
2377
    _StartInstanceDisks(self.cfg, inst, None)
2378
    try:
2379
      feedback_fn("Running the instance OS create scripts...")
2380
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2381
        raise errors.OpExecError("Could not install OS for instance %s"
2382
                                 " on node %s" %
2383
                                 (inst.name, inst.primary_node))
2384
    finally:
2385
      _ShutdownInstanceDisks(inst, self.cfg)
2386

    
2387

    
2388
class LURenameInstance(LogicalUnit):
2389
  """Rename an instance.
2390

2391
  """
2392
  HPATH = "instance-rename"
2393
  HTYPE = constants.HTYPE_INSTANCE
2394
  _OP_REQP = ["instance_name", "new_name"]
2395

    
2396
  def BuildHooksEnv(self):
2397
    """Build hooks env.
2398

2399
    This runs on master, primary and secondary nodes of the instance.
2400

2401
    """
2402
    env = _BuildInstanceHookEnvByObject(self.instance)
2403
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2404
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2405
          list(self.instance.secondary_nodes))
2406
    return env, nl, nl
2407

    
2408
  def CheckPrereq(self):
2409
    """Check prerequisites.
2410

2411
    This checks that the instance is in the cluster and is not running.
2412

2413
    """
2414
    instance = self.cfg.GetInstanceInfo(
2415
      self.cfg.ExpandInstanceName(self.op.instance_name))
2416
    if instance is None:
2417
      raise errors.OpPrereqError("Instance '%s' not known" %
2418
                                 self.op.instance_name)
2419
    if instance.status != "down":
2420
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2421
                                 self.op.instance_name)
2422
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2423
    if remote_info:
2424
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2425
                                 (self.op.instance_name,
2426
                                  instance.primary_node))
2427
    self.instance = instance
2428

    
2429
    # new name verification
2430
    name_info = utils.HostInfo(self.op.new_name)
2431

    
2432
    self.op.new_name = new_name = name_info.name
2433
    instance_list = self.cfg.GetInstanceList()
2434
    if new_name in instance_list:
2435
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2436
                                 new_name)
2437

    
2438
    if not getattr(self.op, "ignore_ip", False):
2439
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2440
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2441
                                   (name_info.ip, new_name))
2442

    
2443

    
2444
  def Exec(self, feedback_fn):
2445
    """Reinstall the instance.
2446

2447
    """
2448
    inst = self.instance
2449
    old_name = inst.name
2450

    
2451
    if inst.disk_template == constants.DT_FILE:
2452
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2453

    
2454
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2455
    # Change the instance lock. This is definitely safe while we hold the BGL
2456
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2457
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2458

    
2459
    # re-read the instance from the configuration after rename
2460
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2461

    
2462
    if inst.disk_template == constants.DT_FILE:
2463
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2464
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2465
                                                old_file_storage_dir,
2466
                                                new_file_storage_dir)
2467

    
2468
      if not result:
2469
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2470
                                 " directory '%s' to '%s' (but the instance"
2471
                                 " has been renamed in Ganeti)" % (
2472
                                 inst.primary_node, old_file_storage_dir,
2473
                                 new_file_storage_dir))
2474

    
2475
      if not result[0]:
2476
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2477
                                 " (but the instance has been renamed in"
2478
                                 " Ganeti)" % (old_file_storage_dir,
2479
                                               new_file_storage_dir))
2480

    
2481
    _StartInstanceDisks(self.cfg, inst, None)
2482
    try:
2483
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2484
                                          "sda", "sdb"):
2485
        msg = ("Could not run OS rename script for instance %s on node %s"
2486
               " (but the instance has been renamed in Ganeti)" %
2487
               (inst.name, inst.primary_node))
2488
        logger.Error(msg)
2489
    finally:
2490
      _ShutdownInstanceDisks(inst, self.cfg)
2491

    
2492

    
2493
class LURemoveInstance(LogicalUnit):
2494
  """Remove an instance.
2495

2496
  """
2497
  HPATH = "instance-remove"
2498
  HTYPE = constants.HTYPE_INSTANCE
2499
  _OP_REQP = ["instance_name", "ignore_failures"]
2500
  REQ_BGL = False
2501

    
2502
  def ExpandNames(self):
2503
    self._ExpandAndLockInstance()
2504
    self.needed_locks[locking.LEVEL_NODE] = []
2505
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2506

    
2507
  def DeclareLocks(self, level):
2508
    if level == locking.LEVEL_NODE:
2509
      self._LockInstancesNodes()
2510

    
2511
  def BuildHooksEnv(self):
2512
    """Build hooks env.
2513

2514
    This runs on master, primary and secondary nodes of the instance.
2515

2516
    """
2517
    env = _BuildInstanceHookEnvByObject(self.instance)
2518
    nl = [self.sstore.GetMasterNode()]
2519
    return env, nl, nl
2520

    
2521
  def CheckPrereq(self):
2522
    """Check prerequisites.
2523

2524
    This checks that the instance is in the cluster.
2525

2526
    """
2527
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2528
    assert self.instance is not None, \
2529
      "Cannot retrieve locked instance %s" % self.op.instance_name
2530

    
2531
  def Exec(self, feedback_fn):
2532
    """Remove the instance.
2533

2534
    """
2535
    instance = self.instance
2536
    logger.Info("shutting down instance %s on node %s" %
2537
                (instance.name, instance.primary_node))
2538

    
2539
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2540
      if self.op.ignore_failures:
2541
        feedback_fn("Warning: can't shutdown instance")
2542
      else:
2543
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2544
                                 (instance.name, instance.primary_node))
2545

    
2546
    logger.Info("removing block devices for instance %s" % instance.name)
2547

    
2548
    if not _RemoveDisks(instance, self.cfg):
2549
      if self.op.ignore_failures:
2550
        feedback_fn("Warning: can't remove instance's disks")
2551
      else:
2552
        raise errors.OpExecError("Can't remove instance's disks")
2553

    
2554
    logger.Info("removing instance %s out of cluster config" % instance.name)
2555

    
2556
    self.cfg.RemoveInstance(instance.name)
2557
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2558

    
2559

    
2560
class LUQueryInstances(NoHooksLU):
2561
  """Logical unit for querying instances.
2562

2563
  """
2564
  _OP_REQP = ["output_fields", "names"]
2565
  REQ_BGL = False
2566

    
2567
  def ExpandNames(self):
2568
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2569
    self.static_fields = frozenset([
2570
      "name", "os", "pnode", "snodes",
2571
      "admin_state", "admin_ram",
2572
      "disk_template", "ip", "mac", "bridge",
2573
      "sda_size", "sdb_size", "vcpus", "tags",
2574
      "network_port", "kernel_path", "initrd_path",
2575
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2576
      "hvm_cdrom_image_path", "hvm_nic_type",
2577
      "hvm_disk_type", "vnc_bind_address",
2578
      ])
2579
    _CheckOutputFields(static=self.static_fields,
2580
                       dynamic=self.dynamic_fields,
2581
                       selected=self.op.output_fields)
2582

    
2583
    self.needed_locks = {}
2584
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2585
    self.share_locks[locking.LEVEL_NODE] = 1
2586

    
2587
    if self.op.names:
2588
      self.wanted = _GetWantedInstances(self, self.op.names)
2589
    else:
2590
      self.wanted = locking.ALL_SET
2591

    
2592
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2593
    if self.do_locking:
2594
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2595
      self.needed_locks[locking.LEVEL_NODE] = []
2596
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2597

    
2598
  def DeclareLocks(self, level):
2599
    if level == locking.LEVEL_NODE and self.do_locking:
2600
      self._LockInstancesNodes()
2601

    
2602
  def CheckPrereq(self):
2603
    """Check prerequisites.
2604

2605
    """
2606
    pass
2607

    
2608
  def Exec(self, feedback_fn):
2609
    """Computes the list of nodes and their attributes.
2610

2611
    """
2612
    all_info = self.cfg.GetAllInstancesInfo()
2613
    if self.do_locking:
2614
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2615
    elif self.wanted != locking.ALL_SET:
2616
      instance_names = self.wanted
2617
      missing = set(instance_names).difference(all_info.keys())
2618
      if missing:
2619
        raise self.OpExecError(
2620
          "Some instances were removed before retrieving their data: %s"
2621
          % missing)
2622
    else:
2623
      instance_names = all_info.keys()
2624
    instance_list = [all_info[iname] for iname in instance_names]
2625

    
2626
    # begin data gathering
2627

    
2628
    nodes = frozenset([inst.primary_node for inst in instance_list])
2629

    
2630
    bad_nodes = []
2631
    if self.dynamic_fields.intersection(self.op.output_fields):
2632
      live_data = {}
2633
      node_data = rpc.call_all_instances_info(nodes)
2634
      for name in nodes:
2635
        result = node_data[name]
2636
        if result:
2637
          live_data.update(result)
2638
        elif result == False:
2639
          bad_nodes.append(name)
2640
        # else no instance is alive
2641
    else:
2642
      live_data = dict([(name, {}) for name in instance_names])
2643

    
2644
    # end data gathering
2645

    
2646
    output = []
2647
    for instance in instance_list:
2648
      iout = []
2649
      for field in self.op.output_fields:
2650
        if field == "name":
2651
          val = instance.name
2652
        elif field == "os":
2653
          val = instance.os
2654
        elif field == "pnode":
2655
          val = instance.primary_node
2656
        elif field == "snodes":
2657
          val = list(instance.secondary_nodes)
2658
        elif field == "admin_state":
2659
          val = (instance.status != "down")
2660
        elif field == "oper_state":
2661
          if instance.primary_node in bad_nodes:
2662
            val = None
2663
          else:
2664
            val = bool(live_data.get(instance.name))
2665
        elif field == "status":
2666
          if instance.primary_node in bad_nodes:
2667
            val = "ERROR_nodedown"
2668
          else:
2669
            running = bool(live_data.get(instance.name))
2670
            if running:
2671
              if instance.status != "down":
2672
                val = "running"
2673
              else:
2674
                val = "ERROR_up"
2675
            else:
2676
              if instance.status != "down":
2677
                val = "ERROR_down"
2678
              else:
2679
                val = "ADMIN_down"
2680
        elif field == "admin_ram":
2681
          val = instance.memory
2682
        elif field == "oper_ram":
2683
          if instance.primary_node in bad_nodes:
2684
            val = None
2685
          elif instance.name in live_data:
2686
            val = live_data[instance.name].get("memory", "?")
2687
          else:
2688
            val = "-"
2689
        elif field == "disk_template":
2690
          val = instance.disk_template
2691
        elif field == "ip":
2692
          val = instance.nics[0].ip
2693
        elif field == "bridge":
2694
          val = instance.nics[0].bridge
2695
        elif field == "mac":
2696
          val = instance.nics[0].mac
2697
        elif field == "sda_size" or field == "sdb_size":
2698
          disk = instance.FindDisk(field[:3])
2699
          if disk is None:
2700
            val = None
2701
          else:
2702
            val = disk.size
2703
        elif field == "vcpus":
2704
          val = instance.vcpus
2705
        elif field == "tags":
2706
          val = list(instance.GetTags())
2707
        elif field in ("network_port", "kernel_path", "initrd_path",
2708
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2709
                       "hvm_cdrom_image_path", "hvm_nic_type",
2710
                       "hvm_disk_type", "vnc_bind_address"):
2711
          val = getattr(instance, field, None)
2712
          if val is not None:
2713
            pass
2714
          elif field in ("hvm_nic_type", "hvm_disk_type",
2715
                         "kernel_path", "initrd_path"):
2716
            val = "default"
2717
          else:
2718
            val = "-"
2719
        else:
2720
          raise errors.ParameterError(field)
2721
        iout.append(val)
2722
      output.append(iout)
2723

    
2724
    return output
2725

    
2726

    
2727
class LUFailoverInstance(LogicalUnit):
2728
  """Failover an instance.
2729

2730
  """
2731
  HPATH = "instance-failover"
2732
  HTYPE = constants.HTYPE_INSTANCE
2733
  _OP_REQP = ["instance_name", "ignore_consistency"]
2734
  REQ_BGL = False
2735

    
2736
  def ExpandNames(self):
2737
    self._ExpandAndLockInstance()
2738
    self.needed_locks[locking.LEVEL_NODE] = []
2739
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2740

    
2741
  def DeclareLocks(self, level):
2742
    if level == locking.LEVEL_NODE:
2743
      self._LockInstancesNodes()
2744

    
2745
  def BuildHooksEnv(self):
2746
    """Build hooks env.
2747

2748
    This runs on master, primary and secondary nodes of the instance.
2749

2750
    """
2751
    env = {
2752
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2753
      }
2754
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2755
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2756
    return env, nl, nl
2757

    
2758
  def CheckPrereq(self):
2759
    """Check prerequisites.
2760

2761
    This checks that the instance is in the cluster.
2762

2763
    """
2764
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2765
    assert self.instance is not None, \
2766
      "Cannot retrieve locked instance %s" % self.op.instance_name
2767

    
2768
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2769
      raise errors.OpPrereqError("Instance's disk layout is not"
2770
                                 " network mirrored, cannot failover.")
2771

    
2772
    secondary_nodes = instance.secondary_nodes
2773
    if not secondary_nodes:
2774
      raise errors.ProgrammerError("no secondary node but using "
2775
                                   "a mirrored disk template")
2776

    
2777
    target_node = secondary_nodes[0]
2778
    # check memory requirements on the secondary node
2779
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2780
                         instance.name, instance.memory)
2781

    
2782
    # check bridge existance
2783
    brlist = [nic.bridge for nic in instance.nics]
2784
    if not rpc.call_bridges_exist(target_node, brlist):
2785
      raise errors.OpPrereqError("One or more target bridges %s does not"
2786
                                 " exist on destination node '%s'" %
2787
                                 (brlist, target_node))
2788

    
2789
  def Exec(self, feedback_fn):
2790
    """Failover an instance.
2791

2792
    The failover is done by shutting it down on its present node and
2793
    starting it on the secondary.
2794

2795
    """
2796
    instance = self.instance
2797

    
2798
    source_node = instance.primary_node
2799
    target_node = instance.secondary_nodes[0]
2800

    
2801
    feedback_fn("* checking disk consistency between source and target")
2802
    for dev in instance.disks:
2803
      # for drbd, these are drbd over lvm
2804
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2805
        if instance.status == "up" and not self.op.ignore_consistency:
2806
          raise errors.OpExecError("Disk %s is degraded on target node,"
2807
                                   " aborting failover." % dev.iv_name)
2808

    
2809
    feedback_fn("* shutting down instance on source node")
2810
    logger.Info("Shutting down instance %s on node %s" %
2811
                (instance.name, source_node))
2812

    
2813
    if not rpc.call_instance_shutdown(source_node, instance):
2814
      if self.op.ignore_consistency:
2815
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2816
                     " anyway. Please make sure node %s is down"  %
2817
                     (instance.name, source_node, source_node))
2818
      else:
2819
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2820
                                 (instance.name, source_node))
2821

    
2822
    feedback_fn("* deactivating the instance's disks on source node")
2823
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2824
      raise errors.OpExecError("Can't shut down the instance's disks.")
2825

    
2826
    instance.primary_node = target_node
2827
    # distribute new instance config to the other nodes
2828
    self.cfg.Update(instance)
2829

    
2830
    # Only start the instance if it's marked as up
2831
    if instance.status == "up":
2832
      feedback_fn("* activating the instance's disks on target node")
2833
      logger.Info("Starting instance %s on node %s" %
2834
                  (instance.name, target_node))
2835

    
2836
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2837
                                               ignore_secondaries=True)
2838
      if not disks_ok:
2839
        _ShutdownInstanceDisks(instance, self.cfg)
2840
        raise errors.OpExecError("Can't activate the instance's disks")
2841

    
2842
      feedback_fn("* starting the instance on the target node")
2843
      if not rpc.call_instance_start(target_node, instance, None):
2844
        _ShutdownInstanceDisks(instance, self.cfg)
2845
        raise errors.OpExecError("Could not start instance %s on node %s." %
2846
                                 (instance.name, target_node))
2847

    
2848

    
2849
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2850
  """Create a tree of block devices on the primary node.
2851

2852
  This always creates all devices.
2853

2854
  """
2855
  if device.children:
2856
    for child in device.children:
2857
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2858
        return False
2859

    
2860
  cfg.SetDiskID(device, node)
2861
  new_id = rpc.call_blockdev_create(node, device, device.size,
2862
                                    instance.name, True, info)
2863
  if not new_id:
2864
    return False
2865
  if device.physical_id is None:
2866
    device.physical_id = new_id
2867
  return True
2868

    
2869

    
2870
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2871
  """Create a tree of block devices on a secondary node.
2872

2873
  If this device type has to be created on secondaries, create it and
2874
  all its children.
2875

2876
  If not, just recurse to children keeping the same 'force' value.
2877

2878
  """
2879
  if device.CreateOnSecondary():
2880
    force = True
2881
  if device.children:
2882
    for child in device.children:
2883
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2884
                                        child, force, info):
2885
        return False
2886

    
2887
  if not force:
2888
    return True
2889
  cfg.SetDiskID(device, node)
2890
  new_id = rpc.call_blockdev_create(node, device, device.size,
2891
                                    instance.name, False, info)
2892
  if not new_id:
2893
    return False
2894
  if device.physical_id is None:
2895
    device.physical_id = new_id
2896
  return True
2897

    
2898

    
2899
def _GenerateUniqueNames(cfg, exts):
2900
  """Generate a suitable LV name.
2901

2902
  This will generate a logical volume name for the given instance.
2903

2904
  """
2905
  results = []
2906
  for val in exts:
2907
    new_id = cfg.GenerateUniqueID()
2908
    results.append("%s%s" % (new_id, val))
2909
  return results
2910

    
2911

    
2912
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2913
  """Generate a drbd8 device complete with its children.
2914

2915
  """
2916
  port = cfg.AllocatePort()
2917
  vgname = cfg.GetVGName()
2918
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2919
                          logical_id=(vgname, names[0]))
2920
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2921
                          logical_id=(vgname, names[1]))
2922
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2923
                          logical_id = (primary, secondary, port),
2924
                          children = [dev_data, dev_meta],
2925
                          iv_name=iv_name)
2926
  return drbd_dev
2927

    
2928

    
2929
def _GenerateDiskTemplate(cfg, template_name,
2930
                          instance_name, primary_node,
2931
                          secondary_nodes, disk_sz, swap_sz,
2932
                          file_storage_dir, file_driver):
2933
  """Generate the entire disk layout for a given template type.
2934

2935
  """
2936
  #TODO: compute space requirements
2937

    
2938
  vgname = cfg.GetVGName()
2939
  if template_name == constants.DT_DISKLESS:
2940
    disks = []
2941
  elif template_name == constants.DT_PLAIN:
2942
    if len(secondary_nodes) != 0:
2943
      raise errors.ProgrammerError("Wrong template configuration")
2944

    
2945
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2946
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2947
                           logical_id=(vgname, names[0]),
2948
                           iv_name = "sda")
2949
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2950
                           logical_id=(vgname, names[1]),
2951
                           iv_name = "sdb")
2952
    disks = [sda_dev, sdb_dev]
2953
  elif template_name == constants.DT_DRBD8:
2954
    if len(secondary_nodes) != 1:
2955
      raise errors.ProgrammerError("Wrong template configuration")
2956
    remote_node = secondary_nodes[0]
2957
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2958
                                       ".sdb_data", ".sdb_meta"])
2959
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2960
                                         disk_sz, names[0:2], "sda")
2961
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2962
                                         swap_sz, names[2:4], "sdb")
2963
    disks = [drbd_sda_dev, drbd_sdb_dev]
2964
  elif template_name == constants.DT_FILE:
2965
    if len(secondary_nodes) != 0:
2966
      raise errors.ProgrammerError("Wrong template configuration")
2967

    
2968
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2969
                                iv_name="sda", logical_id=(file_driver,
2970
                                "%s/sda" % file_storage_dir))
2971
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2972
                                iv_name="sdb", logical_id=(file_driver,
2973
                                "%s/sdb" % file_storage_dir))
2974
    disks = [file_sda_dev, file_sdb_dev]
2975
  else:
2976
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2977
  return disks
2978

    
2979

    
2980
def _GetInstanceInfoText(instance):
2981
  """Compute that text that should be added to the disk's metadata.
2982

2983
  """
2984
  return "originstname+%s" % instance.name
2985

    
2986

    
2987
def _CreateDisks(cfg, instance):
2988
  """Create all disks for an instance.
2989

2990
  This abstracts away some work from AddInstance.
2991

2992
  Args:
2993
    instance: the instance object
2994

2995
  Returns:
2996
    True or False showing the success of the creation process
2997

2998
  """
2999
  info = _GetInstanceInfoText(instance)
3000

    
3001
  if instance.disk_template == constants.DT_FILE:
3002
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3003
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3004
                                              file_storage_dir)
3005

    
3006
    if not result:
3007
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3008
      return False
3009

    
3010
    if not result[0]:
3011
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3012
      return False
3013

    
3014
  for device in instance.disks:
3015
    logger.Info("creating volume %s for instance %s" %
3016
                (device.iv_name, instance.name))
3017
    #HARDCODE
3018
    for secondary_node in instance.secondary_nodes:
3019
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3020
                                        device, False, info):
3021
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3022
                     (device.iv_name, device, secondary_node))
3023
        return False
3024
    #HARDCODE
3025
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3026
                                    instance, device, info):
3027
      logger.Error("failed to create volume %s on primary!" %
3028
                   device.iv_name)
3029
      return False
3030

    
3031
  return True
3032

    
3033

    
3034
def _RemoveDisks(instance, cfg):
3035
  """Remove all disks for an instance.
3036

3037
  This abstracts away some work from `AddInstance()` and
3038
  `RemoveInstance()`. Note that in case some of the devices couldn't
3039
  be removed, the removal will continue with the other ones (compare
3040
  with `_CreateDisks()`).
3041

3042
  Args:
3043
    instance: the instance object
3044

3045
  Returns:
3046
    True or False showing the success of the removal proces
3047

3048
  """
3049
  logger.Info("removing block devices for instance %s" % instance.name)
3050

    
3051
  result = True
3052
  for device in instance.disks:
3053
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3054
      cfg.SetDiskID(disk, node)
3055
      if not rpc.call_blockdev_remove(node, disk):
3056
        logger.Error("could not remove block device %s on node %s,"
3057
                     " continuing anyway" %
3058
                     (device.iv_name, node))
3059
        result = False
3060

    
3061
  if instance.disk_template == constants.DT_FILE:
3062
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3063
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3064
                                            file_storage_dir):
3065
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3066
      result = False
3067

    
3068
  return result
3069

    
3070

    
3071
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3072
  """Compute disk size requirements in the volume group
3073

3074
  This is currently hard-coded for the two-drive layout.
3075

3076
  """
3077
  # Required free disk space as a function of disk and swap space
3078
  req_size_dict = {
3079
    constants.DT_DISKLESS: None,
3080
    constants.DT_PLAIN: disk_size + swap_size,
3081
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3082
    constants.DT_DRBD8: disk_size + swap_size + 256,
3083
    constants.DT_FILE: None,
3084
  }
3085

    
3086
  if disk_template not in req_size_dict:
3087
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3088
                                 " is unknown" %  disk_template)
3089

    
3090
  return req_size_dict[disk_template]
3091

    
3092

    
3093
class LUCreateInstance(LogicalUnit):
3094
  """Create an instance.
3095

3096
  """
3097
  HPATH = "instance-add"
3098
  HTYPE = constants.HTYPE_INSTANCE
3099
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3100
              "disk_template", "swap_size", "mode", "start", "vcpus",
3101
              "wait_for_sync", "ip_check", "mac"]
3102
  REQ_BGL = False
3103

    
3104
  def _ExpandNode(self, node):
3105
    """Expands and checks one node name.
3106

3107
    """
3108
    node_full = self.cfg.ExpandNodeName(node)
3109
    if node_full is None:
3110
      raise errors.OpPrereqError("Unknown node %s" % node)
3111
    return node_full
3112

    
3113
  def ExpandNames(self):
3114
    """ExpandNames for CreateInstance.
3115

3116
    Figure out the right locks for instance creation.
3117

3118
    """
3119
    self.needed_locks = {}
3120

    
3121
    # set optional parameters to none if they don't exist
3122
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3123
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3124
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3125
                 "vnc_bind_address"]:
3126
      if not hasattr(self.op, attr):
3127
        setattr(self.op, attr, None)
3128

    
3129
    # verify creation mode
3130
    if self.op.mode not in (constants.INSTANCE_CREATE,
3131
                            constants.INSTANCE_IMPORT):
3132
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3133
                                 self.op.mode)
3134
    # disk template and mirror node verification
3135
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3136
      raise errors.OpPrereqError("Invalid disk template name")
3137

    
3138
    #### instance parameters check
3139

    
3140
    # instance name verification
3141
    hostname1 = utils.HostInfo(self.op.instance_name)
3142
    self.op.instance_name = instance_name = hostname1.name
3143

    
3144
    # this is just a preventive check, but someone might still add this
3145
    # instance in the meantime, and creation will fail at lock-add time
3146
    if instance_name in self.cfg.GetInstanceList():
3147
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3148
                                 instance_name)
3149

    
3150
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3151

    
3152
    # ip validity checks
3153
    ip = getattr(self.op, "ip", None)
3154
    if ip is None or ip.lower() == "none":
3155
      inst_ip = None
3156
    elif ip.lower() == "auto":
3157
      inst_ip = hostname1.ip
3158
    else:
3159
      if not utils.IsValidIP(ip):
3160
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3161
                                   " like a valid IP" % ip)
3162
      inst_ip = ip
3163
    self.inst_ip = self.op.ip = inst_ip
3164
    # used in CheckPrereq for ip ping check
3165
    self.check_ip = hostname1.ip
3166

    
3167
    # MAC address verification
3168
    if self.op.mac != "auto":
3169
      if not utils.IsValidMac(self.op.mac.lower()):
3170
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3171
                                   self.op.mac)
3172

    
3173
    # boot order verification
3174
    if self.op.hvm_boot_order is not None:
3175
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3176
        raise errors.OpPrereqError("invalid boot order specified,"
3177
                                   " must be one or more of [acdn]")
3178
    # file storage checks
3179
    if (self.op.file_driver and
3180
        not self.op.file_driver in constants.FILE_DRIVER):
3181
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3182
                                 self.op.file_driver)
3183

    
3184
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3185
      raise errors.OpPrereqError("File storage directory path not absolute")
3186

    
3187
    ### Node/iallocator related checks
3188
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3189
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3190
                                 " node must be given")
3191

    
3192
    if self.op.iallocator:
3193
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3194
    else:
3195
      self.op.pnode = self._ExpandNode(self.op.pnode)
3196
      nodelist = [self.op.pnode]
3197
      if self.op.snode is not None:
3198
        self.op.snode = self._ExpandNode(self.op.snode)
3199
        nodelist.append(self.op.snode)
3200
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3201

    
3202
    # in case of import lock the source node too
3203
    if self.op.mode == constants.INSTANCE_IMPORT:
3204
      src_node = getattr(self.op, "src_node", None)
3205
      src_path = getattr(self.op, "src_path", None)
3206

    
3207
      if src_node is None or src_path is None:
3208
        raise errors.OpPrereqError("Importing an instance requires source"
3209
                                   " node and path options")
3210

    
3211
      if not os.path.isabs(src_path):
3212
        raise errors.OpPrereqError("The source path must be absolute")
3213

    
3214
      self.op.src_node = src_node = self._ExpandNode(src_node)
3215
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3216
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3217

    
3218
    else: # INSTANCE_CREATE
3219
      if getattr(self.op, "os_type", None) is None:
3220
        raise errors.OpPrereqError("No guest OS specified")
3221

    
3222
  def _RunAllocator(self):
3223
    """Run the allocator based on input opcode.
3224

3225
    """
3226
    disks = [{"size": self.op.disk_size, "mode": "w"},
3227
             {"size": self.op.swap_size, "mode": "w"}]
3228
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3229
             "bridge": self.op.bridge}]
3230
    ial = IAllocator(self.cfg, self.sstore,
3231
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3232
                     name=self.op.instance_name,
3233
                     disk_template=self.op.disk_template,
3234
                     tags=[],
3235
                     os=self.op.os_type,
3236
                     vcpus=self.op.vcpus,
3237
                     mem_size=self.op.mem_size,
3238
                     disks=disks,
3239
                     nics=nics,
3240
                     )
3241

    
3242
    ial.Run(self.op.iallocator)
3243

    
3244
    if not ial.success:
3245
      raise errors.OpPrereqError("Can't compute nodes using"
3246
                                 " iallocator '%s': %s" % (self.op.iallocator,
3247
                                                           ial.info))
3248
    if len(ial.nodes) != ial.required_nodes:
3249
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3250
                                 " of nodes (%s), required %s" %
3251
                                 (len(ial.nodes), ial.required_nodes))
3252
    self.op.pnode = ial.nodes[0]
3253
    logger.ToStdout("Selected nodes for the instance: %s" %
3254
                    (", ".join(ial.nodes),))
3255
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3256
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3257
    if ial.required_nodes == 2:
3258
      self.op.snode = ial.nodes[1]
3259

    
3260
  def BuildHooksEnv(self):
3261
    """Build hooks env.
3262

3263
    This runs on master, primary and secondary nodes of the instance.
3264

3265
    """
3266
    env = {
3267
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3268
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3269
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3270
      "INSTANCE_ADD_MODE": self.op.mode,
3271
      }
3272
    if self.op.mode == constants.INSTANCE_IMPORT:
3273
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3274
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3275
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3276

    
3277
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3278
      primary_node=self.op.pnode,
3279
      secondary_nodes=self.secondaries,
3280
      status=self.instance_status,
3281
      os_type=self.op.os_type,
3282
      memory=self.op.mem_size,
3283
      vcpus=self.op.vcpus,
3284
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3285
    ))
3286

    
3287
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3288
          self.secondaries)
3289
    return env, nl, nl
3290

    
3291

    
3292
  def CheckPrereq(self):
3293
    """Check prerequisites.
3294

3295
    """
3296
    if (not self.cfg.GetVGName() and
3297
        self.op.disk_template not in constants.DTS_NOT_LVM):
3298
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3299
                                 " instances")
3300

    
3301
    if self.op.mode == constants.INSTANCE_IMPORT:
3302
      src_node = self.op.src_node
3303
      src_path = self.op.src_path
3304

    
3305
      export_info = rpc.call_export_info(src_node, src_path)
3306

    
3307
      if not export_info:
3308
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3309

    
3310
      if not export_info.has_section(constants.INISECT_EXP):
3311
        raise errors.ProgrammerError("Corrupted export config")
3312

    
3313
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3314
      if (int(ei_version) != constants.EXPORT_VERSION):
3315
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3316
                                   (ei_version, constants.EXPORT_VERSION))
3317

    
3318
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3319
        raise errors.OpPrereqError("Can't import instance with more than"
3320
                                   " one data disk")
3321

    
3322
      # FIXME: are the old os-es, disk sizes, etc. useful?
3323
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3324
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3325
                                                         'disk0_dump'))
3326
      self.src_image = diskimage
3327

    
3328
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3329

    
3330
    if self.op.start and not self.op.ip_check:
3331
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3332
                                 " adding an instance in start mode")
3333

    
3334
    if self.op.ip_check:
3335
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3336
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3337
                                   (self.check_ip, instance_name))
3338

    
3339
    # bridge verification
3340
    bridge = getattr(self.op, "bridge", None)
3341
    if bridge is None:
3342
      self.op.bridge = self.cfg.GetDefBridge()
3343
    else:
3344
      self.op.bridge = bridge
3345

    
3346
    #### allocator run
3347

    
3348
    if self.op.iallocator is not None:
3349
      self._RunAllocator()
3350

    
3351
    #### node related checks
3352

    
3353
    # check primary node
3354
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3355
    assert self.pnode is not None, \
3356
      "Cannot retrieve locked node %s" % self.op.pnode
3357
    self.secondaries = []
3358

    
3359
    # mirror node verification
3360
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3361
      if self.op.snode is None:
3362
        raise errors.OpPrereqError("The networked disk templates need"
3363
                                   " a mirror node")
3364
      if self.op.snode == pnode.name:
3365
        raise errors.OpPrereqError("The secondary node cannot be"
3366
                                   " the primary node.")
3367
      self.secondaries.append(self.op.snode)
3368

    
3369
    req_size = _ComputeDiskSize(self.op.disk_template,
3370
                                self.op.disk_size, self.op.swap_size)
3371

    
3372
    # Check lv size requirements
3373
    if req_size is not None:
3374
      nodenames = [pnode.name] + self.secondaries
3375
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3376
      for node in nodenames:
3377
        info = nodeinfo.get(node, None)
3378
        if not info:
3379
          raise errors.OpPrereqError("Cannot get current information"
3380
                                     " from node '%s'" % node)
3381
        vg_free = info.get('vg_free', None)
3382
        if not isinstance(vg_free, int):
3383
          raise errors.OpPrereqError("Can't compute free disk space on"
3384
                                     " node %s" % node)
3385
        if req_size > info['vg_free']:
3386
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3387
                                     " %d MB available, %d MB required" %
3388
                                     (node, info['vg_free'], req_size))
3389

    
3390
    # os verification
3391
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3392
    if not os_obj:
3393
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3394
                                 " primary node"  % self.op.os_type)
3395

    
3396
    if self.op.kernel_path == constants.VALUE_NONE:
3397
      raise errors.OpPrereqError("Can't set instance kernel to none")
3398

    
3399
    # bridge check on primary node
3400
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3401
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3402
                                 " destination node '%s'" %
3403
                                 (self.op.bridge, pnode.name))
3404

    
3405
    # memory check on primary node
3406
    if self.op.start:
3407
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3408
                           "creating instance %s" % self.op.instance_name,
3409
                           self.op.mem_size)
3410

    
3411
    # hvm_cdrom_image_path verification
3412
    if self.op.hvm_cdrom_image_path is not None:
3413
      # FIXME (als): shouldn't these checks happen on the destination node?
3414
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3415
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3416
                                   " be an absolute path or None, not %s" %
3417
                                   self.op.hvm_cdrom_image_path)
3418
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3419
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3420
                                   " regular file or a symlink pointing to"
3421
                                   " an existing regular file, not %s" %
3422
                                   self.op.hvm_cdrom_image_path)
3423

    
3424
    # vnc_bind_address verification
3425
    if self.op.vnc_bind_address is not None:
3426
      if not utils.IsValidIP(self.op.vnc_bind_address):
3427
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3428
                                   " like a valid IP address" %
3429
                                   self.op.vnc_bind_address)
3430

    
3431
    # Xen HVM device type checks
3432
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3433
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3434
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3435
                                   " hypervisor" % self.op.hvm_nic_type)
3436
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3437
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3438
                                   " hypervisor" % self.op.hvm_disk_type)
3439

    
3440
    if self.op.start:
3441
      self.instance_status = 'up'
3442
    else:
3443
      self.instance_status = 'down'
3444

    
3445
  def Exec(self, feedback_fn):
3446
    """Create and add the instance to the cluster.
3447

3448
    """
3449
    instance = self.op.instance_name
3450
    pnode_name = self.pnode.name
3451

    
3452
    if self.op.mac == "auto":
3453
      mac_address = self.cfg.GenerateMAC()
3454
    else:
3455
      mac_address = self.op.mac
3456

    
3457
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3458
    if self.inst_ip is not None:
3459
      nic.ip = self.inst_ip
3460

    
3461
    ht_kind = self.sstore.GetHypervisorType()
3462
    if ht_kind in constants.HTS_REQ_PORT:
3463
      network_port = self.cfg.AllocatePort()
3464
    else:
3465
      network_port = None
3466

    
3467
    if self.op.vnc_bind_address is None:
3468
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3469

    
3470
    # this is needed because os.path.join does not accept None arguments
3471
    if self.op.file_storage_dir is None:
3472
      string_file_storage_dir = ""
3473
    else:
3474
      string_file_storage_dir = self.op.file_storage_dir
3475

    
3476
    # build the full file storage dir path
3477
    file_storage_dir = os.path.normpath(os.path.join(
3478
                                        self.sstore.GetFileStorageDir(),
3479
                                        string_file_storage_dir, instance))
3480

    
3481

    
3482
    disks = _GenerateDiskTemplate(self.cfg,
3483
                                  self.op.disk_template,
3484
                                  instance, pnode_name,
3485
                                  self.secondaries, self.op.disk_size,
3486
                                  self.op.swap_size,
3487
                                  file_storage_dir,
3488
                                  self.op.file_driver)
3489

    
3490
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3491
                            primary_node=pnode_name,
3492
                            memory=self.op.mem_size,
3493
                            vcpus=self.op.vcpus,
3494
                            nics=[nic], disks=disks,
3495
                            disk_template=self.op.disk_template,
3496
                            status=self.instance_status,
3497
                            network_port=network_port,
3498
                            kernel_path=self.op.kernel_path,
3499
                            initrd_path=self.op.initrd_path,
3500
                            hvm_boot_order=self.op.hvm_boot_order,
3501
                            hvm_acpi=self.op.hvm_acpi,
3502
                            hvm_pae=self.op.hvm_pae,
3503
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3504
                            vnc_bind_address=self.op.vnc_bind_address,
3505
                            hvm_nic_type=self.op.hvm_nic_type,
3506
                            hvm_disk_type=self.op.hvm_disk_type,
3507
                            )
3508

    
3509
    feedback_fn("* creating instance disks...")
3510
    if not _CreateDisks(self.cfg, iobj):
3511
      _RemoveDisks(iobj, self.cfg)
3512
      raise errors.OpExecError("Device creation failed, reverting...")
3513

    
3514
    feedback_fn("adding instance %s to cluster config" % instance)
3515

    
3516
    self.cfg.AddInstance(iobj)
3517
    # Declare that we don't want to remove the instance lock anymore, as we've
3518
    # added the instance to the config
3519
    del self.remove_locks[locking.LEVEL_INSTANCE]
3520

    
3521
    if self.op.wait_for_sync:
3522
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3523
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3524
      # make sure the disks are not degraded (still sync-ing is ok)
3525
      time.sleep(15)
3526
      feedback_fn("* checking mirrors status")
3527
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3528
    else:
3529
      disk_abort = False
3530

    
3531
    if disk_abort:
3532
      _RemoveDisks(iobj, self.cfg)
3533
      self.cfg.RemoveInstance(iobj.name)
3534
      # Make sure the instance lock gets removed
3535
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3536
      raise errors.OpExecError("There are some degraded disks for"
3537
                               " this instance")
3538

    
3539
    feedback_fn("creating os for instance %s on node %s" %
3540
                (instance, pnode_name))
3541

    
3542
    if iobj.disk_template != constants.DT_DISKLESS:
3543
      if self.op.mode == constants.INSTANCE_CREATE:
3544
        feedback_fn("* running the instance OS create scripts...")
3545
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3546
          raise errors.OpExecError("could not add os for instance %s"
3547
                                   " on node %s" %
3548
                                   (instance, pnode_name))
3549

    
3550
      elif self.op.mode == constants.INSTANCE_IMPORT:
3551
        feedback_fn("* running the instance OS import scripts...")
3552
        src_node = self.op.src_node
3553
        src_image = self.src_image
3554
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3555
                                                src_node, src_image):
3556
          raise errors.OpExecError("Could not import os for instance"
3557
                                   " %s on node %s" %
3558
                                   (instance, pnode_name))
3559
      else:
3560
        # also checked in the prereq part
3561
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3562
                                     % self.op.mode)
3563

    
3564
    if self.op.start:
3565
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3566
      feedback_fn("* starting instance...")
3567
      if not rpc.call_instance_start(pnode_name, iobj, None):
3568
        raise errors.OpExecError("Could not start instance")
3569

    
3570

    
3571
class LUConnectConsole(NoHooksLU):
3572
  """Connect to an instance's console.
3573

3574
  This is somewhat special in that it returns the command line that
3575
  you need to run on the master node in order to connect to the
3576
  console.
3577

3578
  """
3579
  _OP_REQP = ["instance_name"]
3580
  REQ_BGL = False
3581

    
3582
  def ExpandNames(self):
3583
    self._ExpandAndLockInstance()
3584

    
3585
  def CheckPrereq(self):
3586
    """Check prerequisites.
3587

3588
    This checks that the instance is in the cluster.
3589

3590
    """
3591
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3592
    assert self.instance is not None, \
3593
      "Cannot retrieve locked instance %s" % self.op.instance_name
3594

    
3595
  def Exec(self, feedback_fn):
3596
    """Connect to the console of an instance
3597

3598
    """
3599
    instance = self.instance
3600
    node = instance.primary_node
3601

    
3602
    node_insts = rpc.call_instance_list([node])[node]
3603
    if node_insts is False:
3604
      raise errors.OpExecError("Can't connect to node %s." % node)
3605

    
3606
    if instance.name not in node_insts:
3607
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3608

    
3609
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3610

    
3611
    hyper = hypervisor.GetHypervisor()
3612
    console_cmd = hyper.GetShellCommandForConsole(instance)
3613

    
3614
    # build ssh cmdline
3615
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3616

    
3617

    
3618
class LUReplaceDisks(LogicalUnit):
3619
  """Replace the disks of an instance.
3620

3621
  """
3622
  HPATH = "mirrors-replace"
3623
  HTYPE = constants.HTYPE_INSTANCE
3624
  _OP_REQP = ["instance_name", "mode", "disks"]
3625
  REQ_BGL = False
3626

    
3627
  def ExpandNames(self):
3628
    self._ExpandAndLockInstance()
3629

    
3630
    if not hasattr(self.op, "remote_node"):
3631
      self.op.remote_node = None
3632

    
3633
    ia_name = getattr(self.op, "iallocator", None)
3634
    if ia_name is not None:
3635
      if self.op.remote_node is not None:
3636
        raise errors.OpPrereqError("Give either the iallocator or the new"
3637
                                   " secondary, not both")
3638
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3639
    elif self.op.remote_node is not None:
3640
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3641
      if remote_node is None:
3642
        raise errors.OpPrereqError("Node '%s' not known" %
3643
                                   self.op.remote_node)
3644
      self.op.remote_node = remote_node
3645
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3646
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3647
    else:
3648
      self.needed_locks[locking.LEVEL_NODE] = []
3649
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3650

    
3651
  def DeclareLocks(self, level):
3652
    # If we're not already locking all nodes in the set we have to declare the
3653
    # instance's primary/secondary nodes.
3654
    if (level == locking.LEVEL_NODE and
3655
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3656
      self._LockInstancesNodes()
3657

    
3658
  def _RunAllocator(self):
3659
    """Compute a new secondary node using an IAllocator.
3660

3661
    """
3662
    ial = IAllocator(self.cfg, self.sstore,
3663
                     mode=constants.IALLOCATOR_MODE_RELOC,
3664
                     name=self.op.instance_name,
3665
                     relocate_from=[self.sec_node])
3666

    
3667
    ial.Run(self.op.iallocator)
3668

    
3669
    if not ial.success:
3670
      raise errors.OpPrereqError("Can't compute nodes using"
3671
                                 " iallocator '%s': %s" % (self.op.iallocator,
3672
                                                           ial.info))
3673
    if len(ial.nodes) != ial.required_nodes:
3674
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3675
                                 " of nodes (%s), required %s" %
3676
                                 (len(ial.nodes), ial.required_nodes))
3677
    self.op.remote_node = ial.nodes[0]
3678
    logger.ToStdout("Selected new secondary for the instance: %s" %
3679
                    self.op.remote_node)
3680

    
3681
  def BuildHooksEnv(self):
3682
    """Build hooks env.
3683

3684
    This runs on the master, the primary and all the secondaries.
3685

3686
    """
3687
    env = {
3688
      "MODE": self.op.mode,
3689
      "NEW_SECONDARY": self.op.remote_node,
3690
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3691
      }
3692
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3693
    nl = [
3694
      self.sstore.GetMasterNode(),
3695
      self.instance.primary_node,
3696
      ]
3697
    if self.op.remote_node is not None:
3698
      nl.append(self.op.remote_node)
3699
    return env, nl, nl
3700

    
3701
  def CheckPrereq(self):
3702
    """Check prerequisites.
3703

3704
    This checks that the instance is in the cluster.
3705

3706
    """
3707
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3708
    assert instance is not None, \
3709
      "Cannot retrieve locked instance %s" % self.op.instance_name
3710
    self.instance = instance
3711

    
3712
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3713
      raise errors.OpPrereqError("Instance's disk layout is not"
3714
                                 " network mirrored.")
3715

    
3716
    if len(instance.secondary_nodes) != 1:
3717
      raise errors.OpPrereqError("The instance has a strange layout,"
3718
                                 " expected one secondary but found %d" %
3719
                                 len(instance.secondary_nodes))
3720

    
3721
    self.sec_node = instance.secondary_nodes[0]
3722

    
3723
    ia_name = getattr(self.op, "iallocator", None)
3724
    if ia_name is not None:
3725
      self._RunAllocator()
3726

    
3727
    remote_node = self.op.remote_node
3728
    if remote_node is not None:
3729
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3730
      assert self.remote_node_info is not None, \
3731
        "Cannot retrieve locked node %s" % remote_node
3732
    else:
3733
      self.remote_node_info = None
3734
    if remote_node == instance.primary_node:
3735
      raise errors.OpPrereqError("The specified node is the primary node of"
3736
                                 " the instance.")
3737
    elif remote_node == self.sec_node:
3738
      if self.op.mode == constants.REPLACE_DISK_SEC:
3739
        # this is for DRBD8, where we can't execute the same mode of
3740
        # replacement as for drbd7 (no different port allocated)
3741
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3742
                                   " replacement")
3743
    if instance.disk_template == constants.DT_DRBD8:
3744
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3745
          remote_node is not None):
3746
        # switch to replace secondary mode
3747
        self.op.mode = constants.REPLACE_DISK_SEC
3748

    
3749
      if self.op.mode == constants.REPLACE_DISK_ALL:
3750
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3751
                                   " secondary disk replacement, not"
3752
                                   " both at once")
3753
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3754
        if remote_node is not None:
3755
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3756
                                     " the secondary while doing a primary"
3757
                                     " node disk replacement")
3758
        self.tgt_node = instance.primary_node
3759
        self.oth_node = instance.secondary_nodes[0]
3760
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3761
        self.new_node = remote_node # this can be None, in which case
3762
                                    # we don't change the secondary
3763
        self.tgt_node = instance.secondary_nodes[0]
3764
        self.oth_node = instance.primary_node
3765
      else:
3766
        raise errors.ProgrammerError("Unhandled disk replace mode")
3767

    
3768
    for name in self.op.disks:
3769
      if instance.FindDisk(name) is None:
3770
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3771
                                   (name, instance.name))
3772

    
3773
  def _ExecD8DiskOnly(self, feedback_fn):
3774
    """Replace a disk on the primary or secondary for dbrd8.
3775

3776
    The algorithm for replace is quite complicated:
3777
      - for each disk to be replaced:
3778
        - create new LVs on the target node with unique names
3779
        - detach old LVs from the drbd device
3780
        - rename old LVs to name_replaced.<time_t>
3781
        - rename new LVs to old LVs
3782
        - attach the new LVs (with the old names now) to the drbd device
3783
      - wait for sync across all devices
3784
      - for each modified disk:
3785
        - remove old LVs (which have the name name_replaces.<time_t>)
3786

3787
    Failures are not very well handled.
3788

3789
    """
3790
    steps_total = 6
3791
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3792
    instance = self.instance
3793
    iv_names = {}
3794
    vgname = self.cfg.GetVGName()
3795
    # start of work
3796
    cfg = self.cfg
3797
    tgt_node = self.tgt_node
3798
    oth_node = self.oth_node
3799

    
3800
    # Step: check device activation
3801
    self.proc.LogStep(1, steps_total, "check device existence")
3802
    info("checking volume groups")
3803
    my_vg = cfg.GetVGName()
3804
    results = rpc.call_vg_list([oth_node, tgt_node])
3805
    if not results:
3806
      raise errors.OpExecError("Can't list volume groups on the nodes")
3807
    for node in oth_node, tgt_node:
3808
      res = results.get(node, False)
3809
      if not res or my_vg not in res:
3810
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3811
                                 (my_vg, node))
3812
    for dev in instance.disks:
3813
      if not dev.iv_name in self.op.disks:
3814
        continue
3815
      for node in tgt_node, oth_node:
3816
        info("checking %s on %s" % (dev.iv_name, node))
3817
        cfg.SetDiskID(dev, node)
3818
        if not rpc.call_blockdev_find(node, dev):
3819
          raise errors.OpExecError("Can't find device %s on node %s" %
3820
                                   (dev.iv_name, node))
3821

    
3822
    # Step: check other node consistency
3823
    self.proc.LogStep(2, steps_total, "check peer consistency")
3824
    for dev in instance.disks:
3825
      if not dev.iv_name in self.op.disks:
3826
        continue
3827
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3828
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3829
                                   oth_node==instance.primary_node):
3830
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3831
                                 " to replace disks on this node (%s)" %
3832
                                 (oth_node, tgt_node))
3833

    
3834
    # Step: create new storage
3835
    self.proc.LogStep(3, steps_total, "allocate new storage")
3836
    for dev in instance.disks:
3837
      if not dev.iv_name in self.op.disks:
3838
        continue
3839
      size = dev.size
3840
      cfg.SetDiskID(dev, tgt_node)
3841
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3842
      names = _GenerateUniqueNames(cfg, lv_names)
3843
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3844
                             logical_id=(vgname, names[0]))
3845
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3846
                             logical_id=(vgname, names[1]))
3847
      new_lvs = [lv_data, lv_meta]
3848
      old_lvs = dev.children
3849
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3850
      info("creating new local storage on %s for %s" %
3851
           (tgt_node, dev.iv_name))
3852
      # since we *always* want to create this LV, we use the
3853
      # _Create...OnPrimary (which forces the creation), even if we
3854
      # are talking about the secondary node
3855
      for new_lv in new_lvs:
3856
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3857
                                        _GetInstanceInfoText(instance)):
3858
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3859
                                   " node '%s'" %
3860
                                   (new_lv.logical_id[1], tgt_node))
3861

    
3862
    # Step: for each lv, detach+rename*2+attach
3863
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3864
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3865
      info("detaching %s drbd from local storage" % dev.iv_name)
3866
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3867
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3868
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3869
      #dev.children = []
3870
      #cfg.Update(instance)
3871

    
3872
      # ok, we created the new LVs, so now we know we have the needed
3873
      # storage; as such, we proceed on the target node to rename
3874
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3875
      # using the assumption that logical_id == physical_id (which in
3876
      # turn is the unique_id on that node)
3877

    
3878
      # FIXME(iustin): use a better name for the replaced LVs
3879
      temp_suffix = int(time.time())
3880
      ren_fn = lambda d, suff: (d.physical_id[0],
3881
                                d.physical_id[1] + "_replaced-%s" % suff)
3882
      # build the rename list based on what LVs exist on the node
3883
      rlist = []
3884
      for to_ren in old_lvs:
3885
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3886
        if find_res is not None: # device exists
3887
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3888

    
3889
      info("renaming the old LVs on the target node")
3890
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3891
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3892
      # now we rename the new LVs to the old LVs
3893
      info("renaming the new LVs on the target node")
3894
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3895
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3896
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3897

    
3898
      for old, new in zip(old_lvs, new_lvs):
3899
        new.logical_id = old.logical_id
3900
        cfg.SetDiskID(new, tgt_node)
3901

    
3902
      for disk in old_lvs:
3903
        disk.logical_id = ren_fn(disk, temp_suffix)
3904
        cfg.SetDiskID(disk, tgt_node)
3905

    
3906
      # now that the new lvs have the old name, we can add them to the device
3907
      info("adding new mirror component on %s" % tgt_node)
3908
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3909
        for new_lv in new_lvs:
3910
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3911
            warning("Can't rollback device %s", hint="manually cleanup unused"
3912
                    " logical volumes")
3913
        raise errors.OpExecError("Can't add local storage to drbd")
3914

    
3915
      dev.children = new_lvs
3916
      cfg.Update(instance)
3917

    
3918
    # Step: wait for sync
3919

    
3920
    # this can fail as the old devices are degraded and _WaitForSync
3921
    # does a combined result over all disks, so we don't check its
3922
    # return value
3923
    self.proc.LogStep(5, steps_total, "sync devices")
3924
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3925

    
3926
    # so check manually all the devices
3927
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3928
      cfg.SetDiskID(dev, instance.primary_node)
3929
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3930
      if is_degr:
3931
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3932

    
3933
    # Step: remove old storage
3934
    self.proc.LogStep(6, steps_total, "removing old storage")
3935
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3936
      info("remove logical volumes for %s" % name)
3937
      for lv in old_lvs:
3938
        cfg.SetDiskID(lv, tgt_node)
3939
        if not rpc.call_blockdev_remove(tgt_node, lv):
3940
          warning("Can't remove old LV", hint="manually remove unused LVs")
3941
          continue
3942

    
3943
  def _ExecD8Secondary(self, feedback_fn):
3944
    """Replace the secondary node for drbd8.
3945

3946
    The algorithm for replace is quite complicated:
3947
      - for all disks of the instance:
3948
        - create new LVs on the new node with same names
3949
        - shutdown the drbd device on the old secondary
3950
        - disconnect the drbd network on the primary
3951
        - create the drbd device on the new secondary
3952
        - network attach the drbd on the primary, using an artifice:
3953
          the drbd code for Attach() will connect to the network if it
3954
          finds a device which is connected to the good local disks but
3955
          not network enabled
3956
      - wait for sync across all devices
3957
      - remove all disks from the old secondary
3958

3959
    Failures are not very well handled.
3960

3961
    """
3962
    steps_total = 6
3963
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3964
    instance = self.instance
3965
    iv_names = {}
3966
    vgname = self.cfg.GetVGName()
3967
    # start of work
3968
    cfg = self.cfg
3969
    old_node = self.tgt_node
3970
    new_node = self.new_node
3971
    pri_node = instance.primary_node
3972

    
3973
    # Step: check device activation
3974
    self.proc.LogStep(1, steps_total, "check device existence")
3975
    info("checking volume groups")
3976
    my_vg = cfg.GetVGName()
3977
    results = rpc.call_vg_list([pri_node, new_node])
3978
    if not results:
3979
      raise errors.OpExecError("Can't list volume groups on the nodes")
3980
    for node in pri_node, new_node:
3981
      res = results.get(node, False)
3982
      if not res or my_vg not in res:
3983
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3984
                                 (my_vg, node))
3985
    for dev in instance.disks:
3986
      if not dev.iv_name in self.op.disks:
3987
        continue
3988
      info("checking %s on %s" % (dev.iv_name, pri_node))
3989
      cfg.SetDiskID(dev, pri_node)
3990
      if not rpc.call_blockdev_find(pri_node, dev):
3991
        raise errors.OpExecError("Can't find device %s on node %s" %
3992
                                 (dev.iv_name, pri_node))
3993

    
3994
    # Step: check other node consistency
3995
    self.proc.LogStep(2, steps_total, "check peer consistency")
3996
    for dev in instance.disks:
3997
      if not dev.iv_name in self.op.disks:
3998
        continue
3999
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4000
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
4001
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4002
                                 " unsafe to replace the secondary" %
4003
                                 pri_node)
4004

    
4005
    # Step: create new storage
4006
    self.proc.LogStep(3, steps_total, "allocate new storage")
4007
    for dev in instance.disks:
4008
      size = dev.size
4009
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4010
      # since we *always* want to create this LV, we use the
4011
      # _Create...OnPrimary (which forces the creation), even if we
4012
      # are talking about the secondary node
4013
      for new_lv in dev.children:
4014
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
4015
                                        _GetInstanceInfoText(instance)):
4016
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4017
                                   " node '%s'" %
4018
                                   (new_lv.logical_id[1], new_node))
4019

    
4020
      iv_names[dev.iv_name] = (dev, dev.children)
4021

    
4022
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4023
    for dev in instance.disks:
4024
      size = dev.size
4025
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4026
      # create new devices on new_node
4027
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4028
                              logical_id=(pri_node, new_node,
4029
                                          dev.logical_id[2]),
4030
                              children=dev.children)
4031
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
4032
                                        new_drbd, False,
4033
                                      _GetInstanceInfoText(instance)):
4034
        raise errors.OpExecError("Failed to create new DRBD on"
4035
                                 " node '%s'" % new_node)
4036

    
4037
    for dev in instance.disks:
4038
      # we have new devices, shutdown the drbd on the old secondary
4039
      info("shutting down drbd for %s on old node" % dev.iv_name)
4040
      cfg.SetDiskID(dev, old_node)
4041
      if not rpc.call_blockdev_shutdown(old_node, dev):
4042
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4043
                hint="Please cleanup this device manually as soon as possible")
4044

    
4045
    info("detaching primary drbds from the network (=> standalone)")
4046
    done = 0
4047
    for dev in instance.disks:
4048
      cfg.SetDiskID(dev, pri_node)
4049
      # set the physical (unique in bdev terms) id to None, meaning
4050
      # detach from network
4051
      dev.physical_id = (None,) * len(dev.physical_id)
4052
      # and 'find' the device, which will 'fix' it to match the
4053
      # standalone state
4054
      if rpc.call_blockdev_find(pri_node, dev):
4055
        done += 1
4056
      else:
4057
        warning("Failed to detach drbd %s from network, unusual case" %
4058
                dev.iv_name)
4059

    
4060
    if not done:
4061
      # no detaches succeeded (very unlikely)
4062
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4063

    
4064
    # if we managed to detach at least one, we update all the disks of
4065
    # the instance to point to the new secondary
4066
    info("updating instance configuration")
4067
    for dev in instance.disks:
4068
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
4069
      cfg.SetDiskID(dev, pri_node)
4070
    cfg.Update(instance)
4071

    
4072
    # and now perform the drbd attach
4073
    info("attaching primary drbds to new secondary (standalone => connected)")
4074
    failures = []
4075
    for dev in instance.disks:
4076
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4077
      # since the attach is smart, it's enough to 'find' the device,
4078
      # it will automatically activate the network, if the physical_id
4079
      # is correct
4080
      cfg.SetDiskID(dev, pri_node)
4081
      if not rpc.call_blockdev_find(pri_node, dev):
4082
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4083
                "please do a gnt-instance info to see the status of disks")
4084

    
4085
    # this can fail as the old devices are degraded and _WaitForSync
4086
    # does a combined result over all disks, so we don't check its
4087
    # return value
4088
    self.proc.LogStep(5, steps_total, "sync devices")
4089
    _WaitForSync(cfg, instance, self.proc, unlock=True)
4090

    
4091
    # so check manually all the devices
4092
    for name, (dev, old_lvs) in iv_names.iteritems():
4093
      cfg.SetDiskID(dev, pri_node)
4094
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4095
      if is_degr:
4096
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4097

    
4098
    self.proc.LogStep(6, steps_total, "removing old storage")
4099
    for name, (dev, old_lvs) in iv_names.iteritems():
4100
      info("remove logical volumes for %s" % name)
4101
      for lv in old_lvs:
4102
        cfg.SetDiskID(lv, old_node)
4103
        if not rpc.call_blockdev_remove(old_node, lv):
4104
          warning("Can't remove LV on old secondary",
4105
                  hint="Cleanup stale volumes by hand")
4106

    
4107
  def Exec(self, feedback_fn):
4108
    """Execute disk replacement.
4109

4110
    This dispatches the disk replacement to the appropriate handler.
4111

4112
    """
4113
    instance = self.instance
4114

    
4115
    # Activate the instance disks if we're replacing them on a down instance
4116
    if instance.status == "down":
4117
      _StartInstanceDisks(self.cfg, instance, True)
4118

    
4119
    if instance.disk_template == constants.DT_DRBD8:
4120
      if self.op.remote_node is None:
4121
        fn = self._ExecD8DiskOnly
4122
      else:
4123
        fn = self._ExecD8Secondary
4124
    else:
4125
      raise errors.ProgrammerError("Unhandled disk replacement case")
4126

    
4127
    ret = fn(feedback_fn)
4128

    
4129
    # Deactivate the instance disks if we're replacing them on a down instance
4130
    if instance.status == "down":
4131
      _SafeShutdownInstanceDisks(instance, self.cfg)
4132

    
4133
    return ret
4134

    
4135

    
4136
class LUGrowDisk(LogicalUnit):
4137
  """Grow a disk of an instance.
4138

4139
  """
4140
  HPATH = "disk-grow"
4141
  HTYPE = constants.HTYPE_INSTANCE
4142
  _OP_REQP = ["instance_name", "disk", "amount"]
4143
  REQ_BGL = False
4144

    
4145
  def ExpandNames(self):
4146
    self._ExpandAndLockInstance()
4147
    self.needed_locks[locking.LEVEL_NODE] = []
4148
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4149

    
4150
  def DeclareLocks(self, level):
4151
    if level == locking.LEVEL_NODE:
4152
      self._LockInstancesNodes()
4153

    
4154
  def BuildHooksEnv(self):
4155
    """Build hooks env.
4156

4157
    This runs on the master, the primary and all the secondaries.
4158

4159
    """
4160
    env = {
4161
      "DISK": self.op.disk,
4162
      "AMOUNT": self.op.amount,
4163
      }
4164
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4165
    nl = [
4166
      self.sstore.GetMasterNode(),
4167
      self.instance.primary_node,
4168
      ]
4169
    return env, nl, nl
4170

    
4171
  def CheckPrereq(self):
4172
    """Check prerequisites.
4173

4174
    This checks that the instance is in the cluster.
4175

4176
    """
4177
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4178
    assert instance is not None, \
4179
      "Cannot retrieve locked instance %s" % self.op.instance_name
4180

    
4181
    self.instance = instance
4182

    
4183
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4184
      raise errors.OpPrereqError("Instance's disk layout does not support"
4185
                                 " growing.")
4186

    
4187
    if instance.FindDisk(self.op.disk) is None:
4188
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4189
                                 (self.op.disk, instance.name))
4190

    
4191
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4192
    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4193
    for node in nodenames:
4194
      info = nodeinfo.get(node, None)
4195
      if not info:
4196
        raise errors.OpPrereqError("Cannot get current information"
4197
                                   " from node '%s'" % node)
4198
      vg_free = info.get('vg_free', None)
4199
      if not isinstance(vg_free, int):
4200
        raise errors.OpPrereqError("Can't compute free disk space on"
4201
                                   " node %s" % node)
4202
      if self.op.amount > info['vg_free']:
4203
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4204
                                   " %d MiB available, %d MiB required" %
4205
                                   (node, info['vg_free'], self.op.amount))
4206

    
4207
  def Exec(self, feedback_fn):
4208
    """Execute disk grow.
4209

4210
    """
4211
    instance = self.instance
4212
    disk = instance.FindDisk(self.op.disk)
4213
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4214
      self.cfg.SetDiskID(disk, node)
4215
      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4216
      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4217
        raise errors.OpExecError("grow request failed to node %s" % node)
4218
      elif not result[0]:
4219
        raise errors.OpExecError("grow request failed to node %s: %s" %
4220
                                 (node, result[1]))
4221
    disk.RecordGrow(self.op.amount)
4222
    self.cfg.Update(instance)
4223
    return
4224

    
4225

    
4226
class LUQueryInstanceData(NoHooksLU):
4227
  """Query runtime instance data.
4228

4229
  """
4230
  _OP_REQP = ["instances"]
4231
  REQ_BGL = False
4232
  def ExpandNames(self):
4233
    self.needed_locks = {}
4234
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4235

    
4236
    if not isinstance(self.op.instances, list):
4237
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4238

    
4239
    if self.op.instances:
4240
      self.wanted_names = []
4241
      for name in self.op.instances:
4242
        full_name = self.cfg.ExpandInstanceName(name)
4243
        if full_name is None:
4244
          raise errors.OpPrereqError("Instance '%s' not known" %
4245
                                     self.op.instance_name)
4246
        self.wanted_names.append(full_name)
4247
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4248
    else:
4249
      self.wanted_names = None
4250
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4251

    
4252
    self.needed_locks[locking.LEVEL_NODE] = []
4253
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4254

    
4255
  def DeclareLocks(self, level):
4256
    if level == locking.LEVEL_NODE:
4257
      self._LockInstancesNodes()
4258

    
4259
  def CheckPrereq(self):
4260
    """Check prerequisites.
4261

4262
    This only checks the optional instance list against the existing names.
4263

4264
    """
4265
    if self.wanted_names is None:
4266
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4267

    
4268
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4269
                             in self.wanted_names]
4270
    return
4271

    
4272
  def _ComputeDiskStatus(self, instance, snode, dev):
4273
    """Compute block device status.
4274

4275
    """
4276
    self.cfg.SetDiskID(dev, instance.primary_node)
4277
    dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4278
    if dev.dev_type in constants.LDS_DRBD:
4279
      # we change the snode then (otherwise we use the one passed in)
4280
      if dev.logical_id[0] == instance.primary_node:
4281
        snode = dev.logical_id[1]
4282
      else:
4283
        snode = dev.logical_id[0]
4284

    
4285
    if snode:
4286
      self.cfg.SetDiskID(dev, snode)
4287
      dev_sstatus = rpc.call_blockdev_find(snode, dev)
4288
    else:
4289
      dev_sstatus = None
4290

    
4291
    if dev.children:
4292
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4293
                      for child in dev.children]
4294
    else:
4295
      dev_children = []
4296

    
4297
    data = {
4298
      "iv_name": dev.iv_name,
4299
      "dev_type": dev.dev_type,
4300
      "logical_id": dev.logical_id,
4301
      "physical_id": dev.physical_id,
4302
      "pstatus": dev_pstatus,
4303
      "sstatus": dev_sstatus,
4304
      "children": dev_children,
4305
      }
4306

    
4307
    return data
4308

    
4309
  def Exec(self, feedback_fn):
4310
    """Gather and return data"""
4311
    result = {}
4312
    for instance in self.wanted_instances:
4313
      remote_info = rpc.call_instance_info(instance.primary_node,
4314
                                                instance.name)
4315
      if remote_info and "state" in remote_info:
4316
        remote_state = "up"
4317
      else:
4318
        remote_state = "down"
4319
      if instance.status == "down":
4320
        config_state = "down"
4321
      else:
4322
        config_state = "up"
4323

    
4324
      disks = [self._ComputeDiskStatus(instance, None, device)
4325
               for device in instance.disks]
4326

    
4327
      idict = {
4328
        "name": instance.name,
4329
        "config_state": config_state,
4330
        "run_state": remote_state,
4331
        "pnode": instance.primary_node,
4332
        "snodes": instance.secondary_nodes,
4333
        "os": instance.os,
4334
        "memory": instance.memory,
4335
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4336
        "disks": disks,
4337
        "vcpus": instance.vcpus,
4338
        }
4339

    
4340
      htkind = self.sstore.GetHypervisorType()
4341
      if htkind == constants.HT_XEN_PVM30:
4342
        idict["kernel_path"] = instance.kernel_path
4343
        idict["initrd_path"] = instance.initrd_path
4344

    
4345
      if htkind == constants.HT_XEN_HVM31:
4346
        idict["hvm_boot_order"] = instance.hvm_boot_order
4347
        idict["hvm_acpi"] = instance.hvm_acpi
4348
        idict["hvm_pae"] = instance.hvm_pae
4349
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4350
        idict["hvm_nic_type"] = instance.hvm_nic_type
4351
        idict["hvm_disk_type"] = instance.hvm_disk_type
4352

    
4353
      if htkind in constants.HTS_REQ_PORT:
4354
        if instance.vnc_bind_address is None:
4355
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4356
        else:
4357
          vnc_bind_address = instance.vnc_bind_address
4358
        if instance.network_port is None:
4359
          vnc_console_port = None
4360
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4361
          vnc_console_port = "%s:%s" % (instance.primary_node,
4362
                                       instance.network_port)
4363
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4364
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4365
                                                   instance.network_port,
4366
                                                   instance.primary_node)
4367
        else:
4368
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4369
                                        instance.network_port)
4370
        idict["vnc_console_port"] = vnc_console_port
4371
        idict["vnc_bind_address"] = vnc_bind_address
4372
        idict["network_port"] = instance.network_port
4373

    
4374
      result[instance.name] = idict
4375

    
4376
    return result
4377

    
4378

    
4379
class LUSetInstanceParams(LogicalUnit):
4380
  """Modifies an instances's parameters.
4381

4382
  """
4383
  HPATH = "instance-modify"
4384
  HTYPE = constants.HTYPE_INSTANCE
4385
  _OP_REQP = ["instance_name"]
4386
  REQ_BGL = False
4387

    
4388
  def ExpandNames(self):
4389
    self._ExpandAndLockInstance()
4390

    
4391
  def BuildHooksEnv(self):
4392
    """Build hooks env.
4393

4394
    This runs on the master, primary and secondaries.
4395

4396
    """
4397
    args = dict()
4398
    if self.mem:
4399
      args['memory'] = self.mem
4400
    if self.vcpus:
4401
      args['vcpus'] = self.vcpus
4402
    if self.do_ip or self.do_bridge or self.mac:
4403
      if self.do_ip:
4404
        ip = self.ip
4405
      else:
4406
        ip = self.instance.nics[0].ip
4407
      if self.bridge:
4408
        bridge = self.bridge
4409
      else:
4410
        bridge = self.instance.nics[0].bridge
4411
      if self.mac:
4412
        mac = self.mac
4413
      else:
4414
        mac = self.instance.nics[0].mac
4415
      args['nics'] = [(ip, bridge, mac)]
4416
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4417
    nl = [self.sstore.GetMasterNode(),
4418
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4419
    return env, nl, nl
4420

    
4421
  def CheckPrereq(self):
4422
    """Check prerequisites.
4423

4424
    This only checks the instance list against the existing names.
4425

4426
    """
4427
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4428
    # a separate CheckArguments function, if we implement one, so the operation
4429
    # can be aborted without waiting for any lock, should it have an error...
4430
    self.mem = getattr(self.op, "mem", None)
4431
    self.vcpus = getattr(self.op, "vcpus", None)
4432
    self.ip = getattr(self.op, "ip", None)
4433
    self.mac = getattr(self.op, "mac", None)
4434
    self.bridge = getattr(self.op, "bridge", None)
4435
    self.kernel_path = getattr(self.op, "kernel_path", None)
4436
    self.initrd_path = getattr(self.op, "initrd_path", None)
4437
    self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4438
    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4439
    self.hvm_pae = getattr(self.op, "hvm_pae", None)
4440
    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4441
    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4442
    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4443
    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4444
    self.force = getattr(self.op, "force", None)
4445
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4446
                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4447
                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4448
                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4449
    if all_parms.count(None) == len(all_parms):
4450
      raise errors.OpPrereqError("No changes submitted")
4451
    if self.mem is not None:
4452
      try:
4453
        self.mem = int(self.mem)
4454
      except ValueError, err:
4455
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4456
    if self.vcpus is not None:
4457
      try:
4458
        self.vcpus = int(self.vcpus)
4459
      except ValueError, err:
4460
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4461
    if self.ip is not None:
4462
      self.do_ip = True
4463
      if self.ip.lower() == "none":
4464
        self.ip = None
4465
      else:
4466
        if not utils.IsValidIP(self.ip):
4467
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4468
    else:
4469
      self.do_ip = False
4470
    self.do_bridge = (self.bridge is not None)
4471
    if self.mac is not None:
4472
      if self.cfg.IsMacInUse(self.mac):
4473
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4474
                                   self.mac)
4475
      if not utils.IsValidMac(self.mac):
4476
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4477

    
4478
    if self.kernel_path is not None:
4479
      self.do_kernel_path = True
4480
      if self.kernel_path == constants.VALUE_NONE:
4481
        raise errors.OpPrereqError("Can't set instance to no kernel")
4482

    
4483
      if self.kernel_path != constants.VALUE_DEFAULT:
4484
        if not os.path.isabs(self.kernel_path):
4485
          raise errors.OpPrereqError("The kernel path must be an absolute"
4486
                                    " filename")
4487
    else:
4488
      self.do_kernel_path = False
4489

    
4490
    if self.initrd_path is not None:
4491
      self.do_initrd_path = True
4492
      if self.initrd_path not in (constants.VALUE_NONE,
4493
                                  constants.VALUE_DEFAULT):
4494
        if not os.path.isabs(self.initrd_path):
4495
          raise errors.OpPrereqError("The initrd path must be an absolute"
4496
                                    " filename")
4497
    else:
4498
      self.do_initrd_path = False
4499

    
4500
    # boot order verification
4501
    if self.hvm_boot_order is not None:
4502
      if self.hvm_boot_order != constants.VALUE_DEFAULT:
4503
        if len(self.hvm_boot_order.strip("acdn")) != 0:
4504
          raise errors.OpPrereqError("invalid boot order specified,"
4505
                                     " must be one or more of [acdn]"
4506
                                     " or 'default'")
4507

    
4508
    # hvm_cdrom_image_path verification
4509
    if self.op.hvm_cdrom_image_path is not None:
4510
      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4511
              self.op.hvm_cdrom_image_path.lower() == "none"):
4512
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
4513
                                   " be an absolute path or None, not %s" %
4514
                                   self.op.hvm_cdrom_image_path)
4515
      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4516
              self.op.hvm_cdrom_image_path.lower() == "none"):
4517
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
4518
                                   " regular file or a symlink pointing to"
4519
                                   " an existing regular file, not %s" %
4520
                                   self.op.hvm_cdrom_image_path)
4521

    
4522
    # vnc_bind_address verification
4523
    if self.op.vnc_bind_address is not None:
4524
      if not utils.IsValidIP(self.op.vnc_bind_address):
4525
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4526
                                   " like a valid IP address" %
4527
                                   self.op.vnc_bind_address)
4528

    
4529
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4530
    assert self.instance is not None, \
4531
      "Cannot retrieve locked instance %s" % self.op.instance_name
4532
    self.warn = []
4533
    if self.mem is not None and not self.force:
4534
      pnode = self.instance.primary_node
4535
      nodelist = [pnode]
4536
      nodelist.extend(instance.secondary_nodes)
4537
      instance_info = rpc.call_instance_info(pnode, instance.name)
4538
      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4539

    
4540
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4541
        # Assume the primary node is unreachable and go ahead
4542
        self.warn.append("Can't get info from primary node %s" % pnode)
4543
      else:
4544
        if instance_info:
4545
          current_mem = instance_info['memory']
4546
        else:
4547
          # Assume instance not running
4548
          # (there is a slight race condition here, but it's not very probable,
4549
          # and we have no other way to check)
4550
          current_mem = 0
4551
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4552
        if miss_mem > 0:
4553
          raise errors.OpPrereqError("This change will prevent the instance"
4554
                                     " from starting, due to %d MB of memory"
4555
                                     " missing on its primary node" % miss_mem)
4556

    
4557
      for node in instance.secondary_nodes:
4558
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4559
          self.warn.append("Can't get info from secondary node %s" % node)
4560
        elif self.mem > nodeinfo[node]['memory_free']:
4561
          self.warn.append("Not enough memory to failover instance to secondary"
4562
                           " node %s" % node)
4563

    
4564
    # Xen HVM device type checks
4565
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4566
      if self.op.hvm_nic_type is not None:
4567
        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4568
          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4569
                                     " HVM  hypervisor" % self.op.hvm_nic_type)
4570
      if self.op.hvm_disk_type is not None:
4571
        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4572
          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4573
                                     " HVM hypervisor" % self.op.hvm_disk_type)
4574

    
4575
    return
4576

    
4577
  def Exec(self, feedback_fn):
4578
    """Modifies an instance.
4579

4580
    All parameters take effect only at the next restart of the instance.
4581
    """
4582
    # Process here the warnings from CheckPrereq, as we don't have a
4583
    # feedback_fn there.
4584
    for warn in self.warn:
4585
      feedback_fn("WARNING: %s" % warn)
4586

    
4587
    result = []
4588
    instance = self.instance
4589
    if self.mem:
4590
      instance.memory = self.mem
4591
      result.append(("mem", self.mem))
4592
    if self.vcpus:
4593
      instance.vcpus = self.vcpus
4594
      result.append(("vcpus",  self.vcpus))
4595
    if self.do_ip:
4596
      instance.nics[0].ip = self.ip
4597
      result.append(("ip", self.ip))
4598
    if self.bridge:
4599
      instance.nics[0].bridge = self.bridge
4600
      result.append(("bridge", self.bridge))
4601
    if self.mac:
4602
      instance.nics[0].mac = self.mac
4603
      result.append(("mac", self.mac))
4604
    if self.do_kernel_path:
4605
      instance.kernel_path = self.kernel_path
4606
      result.append(("kernel_path", self.kernel_path))
4607
    if self.do_initrd_path:
4608
      instance.initrd_path = self.initrd_path
4609
      result.append(("initrd_path", self.initrd_path))
4610
    if self.hvm_boot_order:
4611
      if self.hvm_boot_order == constants.VALUE_DEFAULT:
4612
        instance.hvm_boot_order = None
4613
      else:
4614
        instance.hvm_boot_order = self.hvm_boot_order
4615
      result.append(("hvm_boot_order", self.hvm_boot_order))
4616
    if self.hvm_acpi is not None:
4617
      instance.hvm_acpi = self.hvm_acpi
4618
      result.append(("hvm_acpi", self.hvm_acpi))
4619
    if self.hvm_pae is not None:
4620
      instance.hvm_pae = self.hvm_pae
4621
      result.append(("hvm_pae", self.hvm_pae))
4622
    if self.hvm_nic_type is not None:
4623
      instance.hvm_nic_type = self.hvm_nic_type
4624
      result.append(("hvm_nic_type", self.hvm_nic_type))
4625
    if self.hvm_disk_type is not None:
4626
      instance.hvm_disk_type = self.hvm_disk_type
4627
      result.append(("hvm_disk_type", self.hvm_disk_type))
4628
    if self.hvm_cdrom_image_path:
4629
      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4630
        instance.hvm_cdrom_image_path = None
4631
      else:
4632
        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4633
      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4634
    if self.vnc_bind_address:
4635
      instance.vnc_bind_address = self.vnc_bind_address
4636
      result.append(("vnc_bind_address", self.vnc_bind_address))
4637

    
4638
    self.cfg.Update(instance)
4639

    
4640
    return result
4641

    
4642

    
4643
class LUQueryExports(NoHooksLU):
4644
  """Query the exports list
4645

4646
  """
4647
  _OP_REQP = ['nodes']
4648
  REQ_BGL = False
4649

    
4650
  def ExpandNames(self):
4651
    self.needed_locks = {}
4652
    self.share_locks[locking.LEVEL_NODE] = 1
4653
    if not self.op.nodes:
4654
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4655
    else:
4656
      self.needed_locks[locking.LEVEL_NODE] = \
4657
        _GetWantedNodes(self, self.op.nodes)
4658

    
4659
  def CheckPrereq(self):
4660
    """Check prerequisites.
4661

4662
    """
4663
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4664

    
4665
  def Exec(self, feedback_fn):
4666
    """Compute the list of all the exported system images.
4667

4668
    Returns:
4669
      a dictionary with the structure node->(export-list)
4670
      where export-list is a list of the instances exported on
4671
      that node.
4672

4673
    """
4674
    return rpc.call_export_list(self.nodes)
4675

    
4676

    
4677
class LUExportInstance(LogicalUnit):
4678
  """Export an instance to an image in the cluster.
4679

4680
  """
4681
  HPATH = "instance-export"
4682
  HTYPE = constants.HTYPE_INSTANCE
4683
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4684
  REQ_BGL = False
4685

    
4686
  def ExpandNames(self):
4687
    self._ExpandAndLockInstance()
4688
    # FIXME: lock only instance primary and destination node
4689
    #
4690
    # Sad but true, for now we have do lock all nodes, as we don't know where
4691
    # the previous export might be, and and in this LU we search for it and
4692
    # remove it from its current node. In the future we could fix this by:
4693
    #  - making a tasklet to search (share-lock all), then create the new one,
4694
    #    then one to remove, after
4695
    #  - removing the removal operation altoghether
4696
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4697

    
4698
  def DeclareLocks(self, level):
4699
    """Last minute lock declaration."""
4700
    # All nodes are locked anyway, so nothing to do here.
4701

    
4702
  def BuildHooksEnv(self):
4703
    """Build hooks env.
4704

4705
    This will run on the master, primary node and target node.
4706

4707
    """
4708
    env = {
4709
      "EXPORT_NODE": self.op.target_node,
4710
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4711
      }
4712
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4713
    nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4714
          self.op.target_node]
4715
    return env, nl, nl
4716

    
4717
  def CheckPrereq(self):
4718
    """Check prerequisites.
4719

4720
    This checks that the instance and node names are valid.
4721

4722
    """
4723
    instance_name = self.op.instance_name
4724
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4725
    assert self.instance is not None, \
4726
          "Cannot retrieve locked instance %s" % self.op.instance_name
4727

    
4728
    self.dst_node = self.cfg.GetNodeInfo(
4729
      self.cfg.ExpandNodeName(self.op.target_node))
4730

    
4731
    assert self.dst_node is not None, \
4732
          "Cannot retrieve locked node %s" % self.op.target_node
4733

    
4734
    # instance disk type verification
4735
    for disk in self.instance.disks:
4736
      if disk.dev_type == constants.LD_FILE:
4737
        raise errors.OpPrereqError("Export not supported for instances with"
4738
                                   " file-based disks")
4739

    
4740
  def Exec(self, feedback_fn):
4741
    """Export an instance to an image in the cluster.
4742

4743
    """
4744
    instance = self.instance
4745
    dst_node = self.dst_node
4746
    src_node = instance.primary_node
4747
    if self.op.shutdown:
4748
      # shutdown the instance, but not the disks
4749
      if not rpc.call_instance_shutdown(src_node, instance):
4750
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4751
                                 (instance.name, src_node))
4752

    
4753
    vgname = self.cfg.GetVGName()
4754

    
4755
    snap_disks = []
4756

    
4757
    try:
4758
      for disk in instance.disks:
4759
        if disk.iv_name == "sda":
4760
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4761
          new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4762

    
4763
          if not new_dev_name:
4764
            logger.Error("could not snapshot block device %s on node %s" %
4765
                         (disk.logical_id[1], src_node))
4766
          else:
4767
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4768
                                      logical_id=(vgname, new_dev_name),
4769
                                      physical_id=(vgname, new_dev_name),
4770
                                      iv_name=disk.iv_name)
4771
            snap_disks.append(new_dev)
4772

    
4773
    finally:
4774
      if self.op.shutdown and instance.status == "up":
4775
        if not rpc.call_instance_start(src_node, instance, None):
4776
          _ShutdownInstanceDisks(instance, self.cfg)
4777
          raise errors.OpExecError("Could not start instance")
4778

    
4779
    # TODO: check for size
4780

    
4781
    for dev in snap_disks:
4782
      if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4783
        logger.Error("could not export block device %s from node %s to node %s"
4784
                     % (dev.logical_id[1], src_node, dst_node.name))
4785
      if not rpc.call_blockdev_remove(src_node, dev):
4786
        logger.Error("could not remove snapshot block device %s from node %s" %
4787
                     (dev.logical_id[1], src_node))
4788

    
4789
    if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4790
      logger.Error("could not finalize export for instance %s on node %s" %
4791
                   (instance.name, dst_node.name))
4792

    
4793
    nodelist = self.cfg.GetNodeList()
4794
    nodelist.remove(dst_node.name)
4795

    
4796
    # on one-node clusters nodelist will be empty after the removal
4797
    # if we proceed the backup would be removed because OpQueryExports
4798
    # substitutes an empty list with the full cluster node list.
4799
    if nodelist:
4800
      exportlist = rpc.call_export_list(nodelist)
4801
      for node in exportlist:
4802
        if instance.name in exportlist[node]:
4803
          if not rpc.call_export_remove(node, instance.name):
4804
            logger.Error("could not remove older export for instance %s"
4805
                         " on node %s" % (instance.name, node))
4806

    
4807

    
4808
class LURemoveExport(NoHooksLU):
4809
  """Remove exports related to the named instance.
4810

4811
  """
4812
  _OP_REQP = ["instance_name"]
4813
  REQ_BGL = False
4814

    
4815
  def ExpandNames(self):
4816
    self.needed_locks = {}
4817
    # We need all nodes to be locked in order for RemoveExport to work, but we
4818
    # don't need to lock the instance itself, as nothing will happen to it (and
4819
    # we can remove exports also for a removed instance)
4820
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4821

    
4822
  def CheckPrereq(self):
4823
    """Check prerequisites.
4824
    """
4825
    pass
4826

    
4827
  def Exec(self, feedback_fn):
4828
    """Remove any export.
4829

4830
    """
4831
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4832
    # If the instance was not found we'll try with the name that was passed in.
4833
    # This will only work if it was an FQDN, though.
4834
    fqdn_warn = False
4835
    if not instance_name:
4836
      fqdn_warn = True
4837
      instance_name = self.op.instance_name
4838

    
4839
    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4840
    found = False
4841
    for node in exportlist:
4842
      if instance_name in exportlist[node]:
4843
        found = True
4844
        if not rpc.call_export_remove(node, instance_name):
4845
          logger.Error("could not remove export for instance %s"
4846
                       " on node %s" % (instance_name, node))
4847

    
4848
    if fqdn_warn and not found:
4849
      feedback_fn("Export not found. If trying to remove an export belonging"
4850
                  " to a deleted instance please use its Fully Qualified"
4851
                  " Domain Name.")
4852

    
4853

    
4854
class TagsLU(NoHooksLU):
4855
  """Generic tags LU.
4856

4857
  This is an abstract class which is the parent of all the other tags LUs.
4858

4859
  """
4860

    
4861
  def ExpandNames(self):
4862
    self.needed_locks = {}
4863
    if self.op.kind == constants.TAG_NODE:
4864
      name = self.cfg.ExpandNodeName(self.op.name)
4865
      if name is None:
4866
        raise errors.OpPrereqError("Invalid node name (%s)" %
4867
                                   (self.op.name,))
4868
      self.op.name = name
4869
      self.needed_locks[locking.LEVEL_NODE] = name
4870
    elif self.op.kind == constants.TAG_INSTANCE:
4871
      name = self.cfg.ExpandInstanceName(self.op.name)
4872
      if name is None:
4873
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4874
                                   (self.op.name,))
4875
      self.op.name = name
4876
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4877

    
4878
  def CheckPrereq(self):
4879
    """Check prerequisites.
4880

4881
    """
4882
    if self.op.kind == constants.TAG_CLUSTER:
4883
      self.target = self.cfg.GetClusterInfo()
4884
    elif self.op.kind == constants.TAG_NODE:
4885
      self.target = self.cfg.GetNodeInfo(self.op.name)
4886
    elif self.op.kind == constants.TAG_INSTANCE:
4887
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4888
    else:
4889
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4890
                                 str(self.op.kind))
4891

    
4892

    
4893
class LUGetTags(TagsLU):
4894
  """Returns the tags of a given object.
4895

4896
  """
4897
  _OP_REQP = ["kind", "name"]
4898
  REQ_BGL = False
4899

    
4900
  def Exec(self, feedback_fn):
4901
    """Returns the tag list.
4902

4903
    """
4904
    return list(self.target.GetTags())
4905

    
4906

    
4907
class LUSearchTags(NoHooksLU):
4908
  """Searches the tags for a given pattern.
4909

4910
  """
4911
  _OP_REQP = ["pattern"]
4912
  REQ_BGL = False
4913

    
4914
  def ExpandNames(self):
4915
    self.needed_locks = {}
4916

    
4917
  def CheckPrereq(self):
4918
    """Check prerequisites.
4919

4920
    This checks the pattern passed for validity by compiling it.
4921

4922
    """
4923
    try:
4924
      self.re = re.compile(self.op.pattern)
4925
    except re.error, err:
4926
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4927
                                 (self.op.pattern, err))
4928

    
4929
  def Exec(self, feedback_fn):
4930
    """Returns the tag list.
4931

4932
    """
4933
    cfg = self.cfg
4934
    tgts = [("/cluster", cfg.GetClusterInfo())]
4935
    ilist = cfg.GetAllInstancesInfo().values()
4936
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4937
    nlist = cfg.GetAllNodesInfo().values()
4938
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4939
    results = []
4940
    for path, target in tgts:
4941
      for tag in target.GetTags():
4942
        if self.re.search(tag):
4943
          results.append((path, tag))
4944
    return results
4945

    
4946

    
4947
class LUAddTags(TagsLU):
4948
  """Sets a tag on a given object.
4949

4950
  """
4951
  _OP_REQP = ["kind", "name", "tags"]
4952
  REQ_BGL = False
4953

    
4954
  def CheckPrereq(self):
4955
    """Check prerequisites.
4956

4957
    This checks the type and length of the tag name and value.
4958

4959
    """
4960
    TagsLU.CheckPrereq(self)
4961
    for tag in self.op.tags:
4962
      objects.TaggableObject.ValidateTag(tag)
4963

    
4964
  def Exec(self, feedback_fn):
4965
    """Sets the tag.
4966

4967
    """
4968
    try:
4969
      for tag in self.op.tags:
4970
        self.target.AddTag(tag)
4971
    except errors.TagError, err:
4972
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4973
    try:
4974
      self.cfg.Update(self.target)
4975
    except errors.ConfigurationError:
4976
      raise errors.OpRetryError("There has been a modification to the"
4977
                                " config file and the operation has been"
4978
                                " aborted. Please retry.")
4979

    
4980

    
4981
class LUDelTags(TagsLU):
4982
  """Delete a list of tags from a given object.
4983

4984
  """
4985
  _OP_REQP = ["kind", "name", "tags"]
4986
  REQ_BGL = False
4987

    
4988
  def CheckPrereq(self):
4989
    """Check prerequisites.
4990

4991
    This checks that we have the given tag.
4992

4993
    """
4994
    TagsLU.CheckPrereq(self)
4995
    for tag in self.op.tags:
4996
      objects.TaggableObject.ValidateTag(tag)
4997
    del_tags = frozenset(self.op.tags)
4998
    cur_tags = self.target.GetTags()
4999
    if not del_tags <= cur_tags:
5000
      diff_tags = del_tags - cur_tags
5001
      diff_names = ["'%s'" % tag for tag in diff_tags]
5002
      diff_names.sort()
5003
      raise errors.OpPrereqError("Tag(s) %s not found" %
5004
                                 (",".join(diff_names)))
5005

    
5006
  def Exec(self, feedback_fn):
5007
    """Remove the tag from the object.
5008

5009
    """
5010
    for tag in self.op.tags:
5011
      self.target.RemoveTag(tag)
5012
    try:
5013
      self.cfg.Update(self.target)
5014
    except errors.ConfigurationError:
5015
      raise errors.OpRetryError("There has been a modification to the"
5016
                                " config file and the operation has been"
5017
                                " aborted. Please retry.")
5018

    
5019

    
5020
class LUTestDelay(NoHooksLU):
5021
  """Sleep for a specified amount of time.
5022

5023
  This LU sleeps on the master and/or nodes for a specified amount of
5024
  time.
5025

5026
  """
5027
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5028
  REQ_BGL = False
5029

    
5030
  def ExpandNames(self):
5031
    """Expand names and set required locks.
5032

5033
    This expands the node list, if any.
5034

5035
    """
5036
    self.needed_locks = {}
5037
    if self.op.on_nodes:
5038
      # _GetWantedNodes can be used here, but is not always appropriate to use
5039
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5040
      # more information.
5041
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5042
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5043

    
5044
  def CheckPrereq(self):
5045
    """Check prerequisites.
5046

5047
    """
5048

    
5049
  def Exec(self, feedback_fn):
5050
    """Do the actual sleep.
5051

5052
    """
5053
    if self.op.on_master:
5054
      if not utils.TestDelay(self.op.duration):
5055
        raise errors.OpExecError("Error during master delay test")
5056
    if self.op.on_nodes:
5057
      result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5058
      if not result:
5059
        raise errors.OpExecError("Complete failure from rpc call")
5060
      for node, node_result in result.items():
5061
        if not node_result:
5062
          raise errors.OpExecError("Failure during rpc call to node %s,"
5063
                                   " result: %s" % (node, node_result))
5064

    
5065

    
5066
class IAllocator(object):
5067
  """IAllocator framework.
5068

5069
  An IAllocator instance has three sets of attributes:
5070
    - cfg/sstore that are needed to query the cluster
5071
    - input data (all members of the _KEYS class attribute are required)
5072
    - four buffer attributes (in|out_data|text), that represent the
5073
      input (to the external script) in text and data structure format,
5074
      and the output from it, again in two formats
5075
    - the result variables from the script (success, info, nodes) for
5076
      easy usage
5077

5078
  """
5079
  _ALLO_KEYS = [
5080
    "mem_size", "disks", "disk_template",
5081
    "os", "tags", "nics", "vcpus",
5082
    ]
5083
  _RELO_KEYS = [
5084
    "relocate_from",
5085
    ]
5086

    
5087
  def __init__(self, cfg, sstore, mode, name, **kwargs):
5088
    self.cfg = cfg
5089
    self.sstore = sstore
5090
    # init buffer variables
5091
    self.in_text = self.out_text = self.in_data = self.out_data = None
5092
    # init all input fields so that pylint is happy
5093
    self.mode = mode
5094
    self.name = name
5095
    self.mem_size = self.disks = self.disk_template = None
5096
    self.os = self.tags = self.nics = self.vcpus = None
5097
    self.relocate_from = None
5098
    # computed fields
5099
    self.required_nodes = None
5100
    # init result fields
5101
    self.success = self.info = self.nodes = None
5102
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5103
      keyset = self._ALLO_KEYS
5104
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5105
      keyset = self._RELO_KEYS
5106
    else:
5107
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5108
                                   " IAllocator" % self.mode)
5109
    for key in kwargs:
5110
      if key not in keyset:
5111
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5112
                                     " IAllocator" % key)
5113
      setattr(self, key, kwargs[key])
5114
    for key in keyset:
5115
      if key not in kwargs:
5116
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5117
                                     " IAllocator" % key)
5118
    self._BuildInputData()
5119

    
5120
  def _ComputeClusterData(self):
5121
    """Compute the generic allocator input data.
5122

5123
    This is the data that is independent of the actual operation.
5124

5125
    """
5126
    cfg = self.cfg
5127
    # cluster data
5128
    data = {
5129
      "version": 1,
5130
      "cluster_name": self.sstore.GetClusterName(),
5131
      "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
5132
      "hypervisor_type": self.sstore.GetHypervisorType(),
5133
      # we don't have job IDs
5134
      }
5135

    
5136
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5137

    
5138
    # node data
5139
    node_results = {}
5140
    node_list = cfg.GetNodeList()
5141
    node_data = rpc.call_node_info(node_list, cfg.GetVGName())
5142
    for nname in node_list:
5143
      ninfo = cfg.GetNodeInfo(nname)
5144
      if nname not in node_data or not isinstance(node_data[nname], dict):
5145
        raise errors.OpExecError("Can't get data for node %s" % nname)
5146
      remote_info = node_data[nname]
5147
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5148
                   'vg_size', 'vg_free', 'cpu_total']:
5149
        if attr not in remote_info:
5150
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5151
                                   (nname, attr))
5152
        try:
5153
          remote_info[attr] = int(remote_info[attr])
5154
        except ValueError, err:
5155
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5156
                                   " %s" % (nname, attr, str(err)))
5157
      # compute memory used by primary instances
5158
      i_p_mem = i_p_up_mem = 0
5159
      for iinfo in i_list:
5160
        if iinfo.primary_node == nname:
5161
          i_p_mem += iinfo.memory
5162
          if iinfo.status == "up":
5163
            i_p_up_mem += iinfo.memory
5164

    
5165
      # compute memory used by instances
5166
      pnr = {
5167
        "tags": list(ninfo.GetTags()),
5168
        "total_memory": remote_info['memory_total'],
5169
        "reserved_memory": remote_info['memory_dom0'],
5170
        "free_memory": remote_info['memory_free'],
5171
        "i_pri_memory": i_p_mem,
5172
        "i_pri_up_memory": i_p_up_mem,
5173
        "total_disk": remote_info['vg_size'],
5174
        "free_disk": remote_info['vg_free'],
5175
        "primary_ip": ninfo.primary_ip,
5176
        "secondary_ip": ninfo.secondary_ip,
5177
        "total_cpus": remote_info['cpu_total'],
5178
        }
5179
      node_results[nname] = pnr
5180
    data["nodes"] = node_results
5181

    
5182
    # instance data
5183
    instance_data = {}
5184
    for iinfo in i_list:
5185
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5186
                  for n in iinfo.nics]
5187
      pir = {
5188
        "tags": list(iinfo.GetTags()),
5189
        "should_run": iinfo.status == "up",
5190
        "vcpus": iinfo.vcpus,
5191
        "memory": iinfo.memory,
5192
        "os": iinfo.os,
5193
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5194
        "nics": nic_data,
5195
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5196
        "disk_template": iinfo.disk_template,
5197
        }
5198
      instance_data[iinfo.name] = pir
5199

    
5200
    data["instances"] = instance_data
5201

    
5202
    self.in_data = data
5203

    
5204
  def _AddNewInstance(self):
5205
    """Add new instance data to allocator structure.
5206

5207
    This in combination with _AllocatorGetClusterData will create the
5208
    correct structure needed as input for the allocator.
5209

5210
    The checks for the completeness of the opcode must have already been
5211
    done.
5212

5213
    """
5214
    data = self.in_data
5215
    if len(self.disks) != 2:
5216
      raise errors.OpExecError("Only two-disk configurations supported")
5217

    
5218
    disk_space = _ComputeDiskSize(self.disk_template,
5219
                                  self.disks[0]["size"], self.disks[1]["size"])
5220

    
5221
    if self.disk_template in constants.DTS_NET_MIRROR:
5222
      self.required_nodes = 2
5223
    else:
5224
      self.required_nodes = 1
5225
    request = {
5226
      "type": "allocate",
5227
      "name": self.name,
5228
      "disk_template": self.disk_template,
5229
      "tags": self.tags,
5230
      "os": self.os,
5231
      "vcpus": self.vcpus,
5232
      "memory": self.mem_size,
5233
      "disks": self.disks,
5234
      "disk_space_total": disk_space,
5235
      "nics": self.nics,
5236
      "required_nodes": self.required_nodes,
5237
      }
5238
    data["request"] = request
5239

    
5240
  def _AddRelocateInstance(self):
5241
    """Add relocate instance data to allocator structure.
5242

5243
    This in combination with _IAllocatorGetClusterData will create the
5244
    correct structure needed as input for the allocator.
5245

5246
    The checks for the completeness of the opcode must have already been
5247
    done.
5248

5249
    """
5250
    instance = self.cfg.GetInstanceInfo(self.name)
5251
    if instance is None:
5252
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5253
                                   " IAllocator" % self.name)
5254

    
5255
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5256
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5257

    
5258
    if len(instance.secondary_nodes) != 1:
5259
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5260

    
5261
    self.required_nodes = 1
5262

    
5263
    disk_space = _ComputeDiskSize(instance.disk_template,
5264
                                  instance.disks[0].size,
5265
                                  instance.disks[1].size)
5266

    
5267
    request = {
5268
      "type": "relocate",
5269
      "name": self.name,
5270
      "disk_space_total": disk_space,
5271
      "required_nodes": self.required_nodes,
5272
      "relocate_from": self.relocate_from,
5273
      }
5274
    self.in_data["request"] = request
5275

    
5276
  def _BuildInputData(self):
5277
    """Build input data structures.
5278

5279
    """
5280
    self._ComputeClusterData()
5281

    
5282
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5283
      self._AddNewInstance()
5284
    else:
5285
      self._AddRelocateInstance()
5286

    
5287
    self.in_text = serializer.Dump(self.in_data)
5288

    
5289
  def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5290
    """Run an instance allocator and return the results.
5291

5292
    """
5293
    data = self.in_text
5294

    
5295
    result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5296

    
5297
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5298
      raise errors.OpExecError("Invalid result from master iallocator runner")
5299

    
5300
    rcode, stdout, stderr, fail = result
5301

    
5302
    if rcode == constants.IARUN_NOTFOUND:
5303
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5304
    elif rcode == constants.IARUN_FAILURE:
5305
      raise errors.OpExecError("Instance allocator call failed: %s,"
5306
                               " output: %s" % (fail, stdout+stderr))
5307
    self.out_text = stdout
5308
    if validate:
5309
      self._ValidateResult()
5310

    
5311
  def _ValidateResult(self):
5312
    """Process the allocator results.
5313

5314
    This will process and if successful save the result in
5315
    self.out_data and the other parameters.
5316

5317
    """
5318
    try:
5319
      rdict = serializer.Load(self.out_text)
5320
    except Exception, err:
5321
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5322

    
5323
    if not isinstance(rdict, dict):
5324
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5325

    
5326
    for key in "success", "info", "nodes":
5327
      if key not in rdict:
5328
        raise errors.OpExecError("Can't parse iallocator results:"
5329
                                 " missing key '%s'" % key)
5330
      setattr(self, key, rdict[key])
5331

    
5332
    if not isinstance(rdict["nodes"], list):
5333
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5334
                               " is not a list")
5335
    self.out_data = rdict
5336

    
5337

    
5338
class LUTestAllocator(NoHooksLU):
5339
  """Run allocator tests.
5340

5341
  This LU runs the allocator tests
5342

5343
  """
5344
  _OP_REQP = ["direction", "mode", "name"]
5345

    
5346
  def CheckPrereq(self):
5347
    """Check prerequisites.
5348

5349
    This checks the opcode parameters depending on the director and mode test.
5350

5351
    """
5352
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5353
      for attr in ["name", "mem_size", "disks", "disk_template",
5354
                   "os", "tags", "nics", "vcpus"]:
5355
        if not hasattr(self.op, attr):
5356
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5357
                                     attr)
5358
      iname = self.cfg.ExpandInstanceName(self.op.name)
5359
      if iname is not None:
5360
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5361
                                   iname)
5362
      if not isinstance(self.op.nics, list):
5363
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5364
      for row in self.op.nics:
5365
        if (not isinstance(row, dict) or
5366
            "mac" not in row or
5367
            "ip" not in row or
5368
            "bridge" not in row):
5369
          raise errors.OpPrereqError("Invalid contents of the"
5370
                                     " 'nics' parameter")
5371
      if not isinstance(self.op.disks, list):
5372
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5373
      if len(self.op.disks) != 2:
5374
        raise errors.OpPrereqError("Only two-disk configurations supported")
5375
      for row in self.op.disks:
5376
        if (not isinstance(row, dict) or
5377
            "size" not in row or
5378
            not isinstance(row["size"], int) or
5379
            "mode" not in row or
5380
            row["mode"] not in ['r', 'w']):
5381
          raise errors.OpPrereqError("Invalid contents of the"
5382
                                     " 'disks' parameter")
5383
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5384
      if not hasattr(self.op, "name"):
5385
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5386
      fname = self.cfg.ExpandInstanceName(self.op.name)
5387
      if fname is None:
5388
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5389
                                   self.op.name)
5390
      self.op.name = fname
5391
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5392
    else:
5393
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5394
                                 self.op.mode)
5395

    
5396
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5397
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5398
        raise errors.OpPrereqError("Missing allocator name")
5399
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5400
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5401
                                 self.op.direction)
5402

    
5403
  def Exec(self, feedback_fn):
5404
    """Run the allocator test.
5405

5406
    """
5407
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5408
      ial = IAllocator(self.cfg, self.sstore,
5409
                       mode=self.op.mode,
5410
                       name=self.op.name,
5411
                       mem_size=self.op.mem_size,
5412
                       disks=self.op.disks,
5413
                       disk_template=self.op.disk_template,
5414
                       os=self.op.os,
5415
                       tags=self.op.tags,
5416
                       nics=self.op.nics,
5417
                       vcpus=self.op.vcpus,
5418
                       )
5419
    else:
5420
      ial = IAllocator(self.cfg, self.sstore,
5421
                       mode=self.op.mode,
5422
                       name=self.op.name,
5423
                       relocate_from=list(self.relocate_from),
5424
                       )
5425

    
5426
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5427
      result = ial.in_text
5428
    else:
5429
      ial.Run(self.op.allocator, validate=False)
5430
      result = ial.out_text
5431
    return result