Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ a2d2e1a7

History | View | Annotate | Download (193.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = self.cfg.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  @type static: L{utils.FieldSet}
377
  @param static: static fields set
378
  @type dynamic: L{utils.FieldSet}
379
  @param dynamic: dynamic fields set
380

381
  """
382
  f = utils.FieldSet()
383
  f.Extend(static)
384
  f.Extend(dynamic)
385

    
386
  delta = f.NonMatching(selected)
387
  if delta:
388
    raise errors.OpPrereqError("Unknown output fields selected: %s"
389
                               % ",".join(delta))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
434
  args = {
435
    'name': instance.name,
436
    'primary_node': instance.primary_node,
437
    'secondary_nodes': instance.secondary_nodes,
438
    'os_type': instance.os,
439
    'status': instance.os,
440
    'memory': bep[constants.BE_MEMORY],
441
    'vcpus': bep[constants.BE_VCPUS],
442
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
443
  }
444
  if override:
445
    args.update(override)
446
  return _BuildInstanceHookEnv(**args)
447

    
448

    
449
def _CheckInstanceBridgesExist(lu, instance):
450
  """Check that the brigdes needed by an instance exist.
451

452
  """
453
  # check bridges existance
454
  brlist = [nic.bridge for nic in instance.nics]
455
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
456
    raise errors.OpPrereqError("one or more target bridges %s does not"
457
                               " exist on destination node '%s'" %
458
                               (brlist, instance.primary_node))
459

    
460

    
461
class LUDestroyCluster(NoHooksLU):
462
  """Logical unit for destroying the cluster.
463

464
  """
465
  _OP_REQP = []
466

    
467
  def CheckPrereq(self):
468
    """Check prerequisites.
469

470
    This checks whether the cluster is empty.
471

472
    Any errors are signalled by raising errors.OpPrereqError.
473

474
    """
475
    master = self.cfg.GetMasterNode()
476

    
477
    nodelist = self.cfg.GetNodeList()
478
    if len(nodelist) != 1 or nodelist[0] != master:
479
      raise errors.OpPrereqError("There are still %d node(s) in"
480
                                 " this cluster." % (len(nodelist) - 1))
481
    instancelist = self.cfg.GetInstanceList()
482
    if instancelist:
483
      raise errors.OpPrereqError("There are still %d instance(s) in"
484
                                 " this cluster." % len(instancelist))
485

    
486
  def Exec(self, feedback_fn):
487
    """Destroys the cluster.
488

489
    """
490
    master = self.cfg.GetMasterNode()
491
    if not self.rpc.call_node_stop_master(master, False):
492
      raise errors.OpExecError("Could not disable the master role")
493
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
494
    utils.CreateBackup(priv_key)
495
    utils.CreateBackup(pub_key)
496
    return master
497

    
498

    
499
class LUVerifyCluster(LogicalUnit):
500
  """Verifies the cluster status.
501

502
  """
503
  HPATH = "cluster-verify"
504
  HTYPE = constants.HTYPE_CLUSTER
505
  _OP_REQP = ["skip_checks"]
506
  REQ_BGL = False
507

    
508
  def ExpandNames(self):
509
    self.needed_locks = {
510
      locking.LEVEL_NODE: locking.ALL_SET,
511
      locking.LEVEL_INSTANCE: locking.ALL_SET,
512
    }
513
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
514

    
515
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
516
                  remote_version, feedback_fn):
517
    """Run multiple tests against a node.
518

519
    Test list:
520
      - compares ganeti version
521
      - checks vg existance and size > 20G
522
      - checks config file checksum
523
      - checks ssh to other nodes
524

525
    Args:
526
      node: name of the node to check
527
      file_list: required list of files
528
      local_cksum: dictionary of local files and their checksums
529

530
    """
531
    # compares ganeti version
532
    local_version = constants.PROTOCOL_VERSION
533
    if not remote_version:
534
      feedback_fn("  - ERROR: connection to %s failed" % (node))
535
      return True
536

    
537
    if local_version != remote_version:
538
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
539
                      (local_version, node, remote_version))
540
      return True
541

    
542
    # checks vg existance and size > 20G
543

    
544
    bad = False
545
    if not vglist:
546
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
547
                      (node,))
548
      bad = True
549
    else:
550
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
551
                                            constants.MIN_VG_SIZE)
552
      if vgstatus:
553
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
554
        bad = True
555

    
556
    if not node_result:
557
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
558
      return True
559

    
560
    # checks config file checksum
561
    # checks ssh to any
562

    
563
    if 'filelist' not in node_result:
564
      bad = True
565
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
566
    else:
567
      remote_cksum = node_result['filelist']
568
      for file_name in file_list:
569
        if file_name not in remote_cksum:
570
          bad = True
571
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
572
        elif remote_cksum[file_name] != local_cksum[file_name]:
573
          bad = True
574
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
575

    
576
    if 'nodelist' not in node_result:
577
      bad = True
578
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
579
    else:
580
      if node_result['nodelist']:
581
        bad = True
582
        for node in node_result['nodelist']:
583
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
584
                          (node, node_result['nodelist'][node]))
585
    if 'node-net-test' not in node_result:
586
      bad = True
587
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
588
    else:
589
      if node_result['node-net-test']:
590
        bad = True
591
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
592
        for node in nlist:
593
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
594
                          (node, node_result['node-net-test'][node]))
595

    
596
    hyp_result = node_result.get('hypervisor', None)
597
    if isinstance(hyp_result, dict):
598
      for hv_name, hv_result in hyp_result.iteritems():
599
        if hv_result is not None:
600
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
601
                      (hv_name, hv_result))
602
    return bad
603

    
604
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
605
                      node_instance, feedback_fn):
606
    """Verify an instance.
607

608
    This function checks to see if the required block devices are
609
    available on the instance's node.
610

611
    """
612
    bad = False
613

    
614
    node_current = instanceconfig.primary_node
615

    
616
    node_vol_should = {}
617
    instanceconfig.MapLVsByNode(node_vol_should)
618

    
619
    for node in node_vol_should:
620
      for volume in node_vol_should[node]:
621
        if node not in node_vol_is or volume not in node_vol_is[node]:
622
          feedback_fn("  - ERROR: volume %s missing on node %s" %
623
                          (volume, node))
624
          bad = True
625

    
626
    if not instanceconfig.status == 'down':
627
      if (node_current not in node_instance or
628
          not instance in node_instance[node_current]):
629
        feedback_fn("  - ERROR: instance %s not running on node %s" %
630
                        (instance, node_current))
631
        bad = True
632

    
633
    for node in node_instance:
634
      if (not node == node_current):
635
        if instance in node_instance[node]:
636
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
637
                          (instance, node))
638
          bad = True
639

    
640
    return bad
641

    
642
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
643
    """Verify if there are any unknown volumes in the cluster.
644

645
    The .os, .swap and backup volumes are ignored. All other volumes are
646
    reported as unknown.
647

648
    """
649
    bad = False
650

    
651
    for node in node_vol_is:
652
      for volume in node_vol_is[node]:
653
        if node not in node_vol_should or volume not in node_vol_should[node]:
654
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
655
                      (volume, node))
656
          bad = True
657
    return bad
658

    
659
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
660
    """Verify the list of running instances.
661

662
    This checks what instances are running but unknown to the cluster.
663

664
    """
665
    bad = False
666
    for node in node_instance:
667
      for runninginstance in node_instance[node]:
668
        if runninginstance not in instancelist:
669
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
670
                          (runninginstance, node))
671
          bad = True
672
    return bad
673

    
674
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
675
    """Verify N+1 Memory Resilience.
676

677
    Check that if one single node dies we can still start all the instances it
678
    was primary for.
679

680
    """
681
    bad = False
682

    
683
    for node, nodeinfo in node_info.iteritems():
684
      # This code checks that every node which is now listed as secondary has
685
      # enough memory to host all instances it is supposed to should a single
686
      # other node in the cluster fail.
687
      # FIXME: not ready for failover to an arbitrary node
688
      # FIXME: does not support file-backed instances
689
      # WARNING: we currently take into account down instances as well as up
690
      # ones, considering that even if they're down someone might want to start
691
      # them even in the event of a node failure.
692
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
693
        needed_mem = 0
694
        for instance in instances:
695
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
696
          if bep[constants.BE_AUTO_BALANCE]:
697
            needed_mem += bep[constants.BE_MEMORY]
698
        if nodeinfo['mfree'] < needed_mem:
699
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
700
                      " failovers should node %s fail" % (node, prinode))
701
          bad = True
702
    return bad
703

    
704
  def CheckPrereq(self):
705
    """Check prerequisites.
706

707
    Transform the list of checks we're going to skip into a set and check that
708
    all its members are valid.
709

710
    """
711
    self.skip_set = frozenset(self.op.skip_checks)
712
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
713
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
714

    
715
  def BuildHooksEnv(self):
716
    """Build hooks env.
717

718
    Cluster-Verify hooks just rone in the post phase and their failure makes
719
    the output be logged in the verify output and the verification to fail.
720

721
    """
722
    all_nodes = self.cfg.GetNodeList()
723
    # TODO: populate the environment with useful information for verify hooks
724
    env = {}
725
    return env, [], all_nodes
726

    
727
  def Exec(self, feedback_fn):
728
    """Verify integrity of cluster, performing various test on nodes.
729

730
    """
731
    bad = False
732
    feedback_fn("* Verifying global settings")
733
    for msg in self.cfg.VerifyConfig():
734
      feedback_fn("  - ERROR: %s" % msg)
735

    
736
    vg_name = self.cfg.GetVGName()
737
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
738
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
739
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
740
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
741
    i_non_redundant = [] # Non redundant instances
742
    i_non_a_balanced = [] # Non auto-balanced instances
743
    node_volume = {}
744
    node_instance = {}
745
    node_info = {}
746
    instance_cfg = {}
747

    
748
    # FIXME: verify OS list
749
    # do local checksums
750
    file_names = []
751
    file_names.append(constants.SSL_CERT_FILE)
752
    file_names.append(constants.CLUSTER_CONF_FILE)
753
    local_checksums = utils.FingerprintFiles(file_names)
754

    
755
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
756
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
757
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
758
    all_vglist = self.rpc.call_vg_list(nodelist)
759
    node_verify_param = {
760
      'filelist': file_names,
761
      'nodelist': nodelist,
762
      'hypervisor': hypervisors,
763
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
764
                        for node in nodeinfo]
765
      }
766
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
767
                                           self.cfg.GetClusterName())
768
    all_rversion = self.rpc.call_version(nodelist)
769
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
770
                                        self.cfg.GetHypervisorType())
771

    
772
    cluster = self.cfg.GetClusterInfo()
773
    for node in nodelist:
774
      feedback_fn("* Verifying node %s" % node)
775
      result = self._VerifyNode(node, file_names, local_checksums,
776
                                all_vglist[node], all_nvinfo[node],
777
                                all_rversion[node], feedback_fn)
778
      bad = bad or result
779

    
780
      # node_volume
781
      volumeinfo = all_volumeinfo[node]
782

    
783
      if isinstance(volumeinfo, basestring):
784
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
785
                    (node, volumeinfo[-400:].encode('string_escape')))
786
        bad = True
787
        node_volume[node] = {}
788
      elif not isinstance(volumeinfo, dict):
789
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
790
        bad = True
791
        continue
792
      else:
793
        node_volume[node] = volumeinfo
794

    
795
      # node_instance
796
      nodeinstance = all_instanceinfo[node]
797
      if type(nodeinstance) != list:
798
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
799
        bad = True
800
        continue
801

    
802
      node_instance[node] = nodeinstance
803

    
804
      # node_info
805
      nodeinfo = all_ninfo[node]
806
      if not isinstance(nodeinfo, dict):
807
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
808
        bad = True
809
        continue
810

    
811
      try:
812
        node_info[node] = {
813
          "mfree": int(nodeinfo['memory_free']),
814
          "dfree": int(nodeinfo['vg_free']),
815
          "pinst": [],
816
          "sinst": [],
817
          # dictionary holding all instances this node is secondary for,
818
          # grouped by their primary node. Each key is a cluster node, and each
819
          # value is a list of instances which have the key as primary and the
820
          # current node as secondary.  this is handy to calculate N+1 memory
821
          # availability if you can only failover from a primary to its
822
          # secondary.
823
          "sinst-by-pnode": {},
824
        }
825
      except ValueError:
826
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
827
        bad = True
828
        continue
829

    
830
    node_vol_should = {}
831

    
832
    for instance in instancelist:
833
      feedback_fn("* Verifying instance %s" % instance)
834
      inst_config = self.cfg.GetInstanceInfo(instance)
835
      result =  self._VerifyInstance(instance, inst_config, node_volume,
836
                                     node_instance, feedback_fn)
837
      bad = bad or result
838

    
839
      inst_config.MapLVsByNode(node_vol_should)
840

    
841
      instance_cfg[instance] = inst_config
842

    
843
      pnode = inst_config.primary_node
844
      if pnode in node_info:
845
        node_info[pnode]['pinst'].append(instance)
846
      else:
847
        feedback_fn("  - ERROR: instance %s, connection to primary node"
848
                    " %s failed" % (instance, pnode))
849
        bad = True
850

    
851
      # If the instance is non-redundant we cannot survive losing its primary
852
      # node, so we are not N+1 compliant. On the other hand we have no disk
853
      # templates with more than one secondary so that situation is not well
854
      # supported either.
855
      # FIXME: does not support file-backed instances
856
      if len(inst_config.secondary_nodes) == 0:
857
        i_non_redundant.append(instance)
858
      elif len(inst_config.secondary_nodes) > 1:
859
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
860
                    % instance)
861

    
862
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
863
        i_non_a_balanced.append(instance)
864

    
865
      for snode in inst_config.secondary_nodes:
866
        if snode in node_info:
867
          node_info[snode]['sinst'].append(instance)
868
          if pnode not in node_info[snode]['sinst-by-pnode']:
869
            node_info[snode]['sinst-by-pnode'][pnode] = []
870
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
871
        else:
872
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
873
                      " %s failed" % (instance, snode))
874

    
875
    feedback_fn("* Verifying orphan volumes")
876
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
877
                                       feedback_fn)
878
    bad = bad or result
879

    
880
    feedback_fn("* Verifying remaining instances")
881
    result = self._VerifyOrphanInstances(instancelist, node_instance,
882
                                         feedback_fn)
883
    bad = bad or result
884

    
885
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
886
      feedback_fn("* Verifying N+1 Memory redundancy")
887
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
888
      bad = bad or result
889

    
890
    feedback_fn("* Other Notes")
891
    if i_non_redundant:
892
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
893
                  % len(i_non_redundant))
894

    
895
    if i_non_a_balanced:
896
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
897
                  % len(i_non_a_balanced))
898

    
899
    return not bad
900

    
901
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
902
    """Analize the post-hooks' result, handle it, and send some
903
    nicely-formatted feedback back to the user.
904

905
    Args:
906
      phase: the hooks phase that has just been run
907
      hooks_results: the results of the multi-node hooks rpc call
908
      feedback_fn: function to send feedback back to the caller
909
      lu_result: previous Exec result
910

911
    """
912
    # We only really run POST phase hooks, and are only interested in
913
    # their results
914
    if phase == constants.HOOKS_PHASE_POST:
915
      # Used to change hooks' output to proper indentation
916
      indent_re = re.compile('^', re.M)
917
      feedback_fn("* Hooks Results")
918
      if not hooks_results:
919
        feedback_fn("  - ERROR: general communication failure")
920
        lu_result = 1
921
      else:
922
        for node_name in hooks_results:
923
          show_node_header = True
924
          res = hooks_results[node_name]
925
          if res is False or not isinstance(res, list):
926
            feedback_fn("    Communication failure")
927
            lu_result = 1
928
            continue
929
          for script, hkr, output in res:
930
            if hkr == constants.HKR_FAIL:
931
              # The node header is only shown once, if there are
932
              # failing hooks on that node
933
              if show_node_header:
934
                feedback_fn("  Node %s:" % node_name)
935
                show_node_header = False
936
              feedback_fn("    ERROR: Script %s failed, output:" % script)
937
              output = indent_re.sub('      ', output)
938
              feedback_fn("%s" % output)
939
              lu_result = 1
940

    
941
      return lu_result
942

    
943

    
944
class LUVerifyDisks(NoHooksLU):
945
  """Verifies the cluster disks status.
946

947
  """
948
  _OP_REQP = []
949
  REQ_BGL = False
950

    
951
  def ExpandNames(self):
952
    self.needed_locks = {
953
      locking.LEVEL_NODE: locking.ALL_SET,
954
      locking.LEVEL_INSTANCE: locking.ALL_SET,
955
    }
956
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
957

    
958
  def CheckPrereq(self):
959
    """Check prerequisites.
960

961
    This has no prerequisites.
962

963
    """
964
    pass
965

    
966
  def Exec(self, feedback_fn):
967
    """Verify integrity of cluster disks.
968

969
    """
970
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
971

    
972
    vg_name = self.cfg.GetVGName()
973
    nodes = utils.NiceSort(self.cfg.GetNodeList())
974
    instances = [self.cfg.GetInstanceInfo(name)
975
                 for name in self.cfg.GetInstanceList()]
976

    
977
    nv_dict = {}
978
    for inst in instances:
979
      inst_lvs = {}
980
      if (inst.status != "up" or
981
          inst.disk_template not in constants.DTS_NET_MIRROR):
982
        continue
983
      inst.MapLVsByNode(inst_lvs)
984
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
985
      for node, vol_list in inst_lvs.iteritems():
986
        for vol in vol_list:
987
          nv_dict[(node, vol)] = inst
988

    
989
    if not nv_dict:
990
      return result
991

    
992
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
993

    
994
    to_act = set()
995
    for node in nodes:
996
      # node_volume
997
      lvs = node_lvs[node]
998

    
999
      if isinstance(lvs, basestring):
1000
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1001
        res_nlvm[node] = lvs
1002
      elif not isinstance(lvs, dict):
1003
        logging.warning("Connection to node %s failed or invalid data"
1004
                        " returned", node)
1005
        res_nodes.append(node)
1006
        continue
1007

    
1008
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1009
        inst = nv_dict.pop((node, lv_name), None)
1010
        if (not lv_online and inst is not None
1011
            and inst.name not in res_instances):
1012
          res_instances.append(inst.name)
1013

    
1014
    # any leftover items in nv_dict are missing LVs, let's arrange the
1015
    # data better
1016
    for key, inst in nv_dict.iteritems():
1017
      if inst.name not in res_missing:
1018
        res_missing[inst.name] = []
1019
      res_missing[inst.name].append(key)
1020

    
1021
    return result
1022

    
1023

    
1024
class LURenameCluster(LogicalUnit):
1025
  """Rename the cluster.
1026

1027
  """
1028
  HPATH = "cluster-rename"
1029
  HTYPE = constants.HTYPE_CLUSTER
1030
  _OP_REQP = ["name"]
1031

    
1032
  def BuildHooksEnv(self):
1033
    """Build hooks env.
1034

1035
    """
1036
    env = {
1037
      "OP_TARGET": self.cfg.GetClusterName(),
1038
      "NEW_NAME": self.op.name,
1039
      }
1040
    mn = self.cfg.GetMasterNode()
1041
    return env, [mn], [mn]
1042

    
1043
  def CheckPrereq(self):
1044
    """Verify that the passed name is a valid one.
1045

1046
    """
1047
    hostname = utils.HostInfo(self.op.name)
1048

    
1049
    new_name = hostname.name
1050
    self.ip = new_ip = hostname.ip
1051
    old_name = self.cfg.GetClusterName()
1052
    old_ip = self.cfg.GetMasterIP()
1053
    if new_name == old_name and new_ip == old_ip:
1054
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1055
                                 " cluster has changed")
1056
    if new_ip != old_ip:
1057
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1058
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1059
                                   " reachable on the network. Aborting." %
1060
                                   new_ip)
1061

    
1062
    self.op.name = new_name
1063

    
1064
  def Exec(self, feedback_fn):
1065
    """Rename the cluster.
1066

1067
    """
1068
    clustername = self.op.name
1069
    ip = self.ip
1070

    
1071
    # shutdown the master IP
1072
    master = self.cfg.GetMasterNode()
1073
    if not self.rpc.call_node_stop_master(master, False):
1074
      raise errors.OpExecError("Could not disable the master role")
1075

    
1076
    try:
1077
      # modify the sstore
1078
      # TODO: sstore
1079
      ss.SetKey(ss.SS_MASTER_IP, ip)
1080
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1081

    
1082
      # Distribute updated ss config to all nodes
1083
      myself = self.cfg.GetNodeInfo(master)
1084
      dist_nodes = self.cfg.GetNodeList()
1085
      if myself.name in dist_nodes:
1086
        dist_nodes.remove(myself.name)
1087

    
1088
      logging.debug("Copying updated ssconf data to all nodes")
1089
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1090
        fname = ss.KeyToFilename(keyname)
1091
        result = self.rpc.call_upload_file(dist_nodes, fname)
1092
        for to_node in dist_nodes:
1093
          if not result[to_node]:
1094
            self.LogWarning("Copy of file %s to node %s failed",
1095
                            fname, to_node)
1096
    finally:
1097
      if not self.rpc.call_node_start_master(master, False):
1098
        self.LogWarning("Could not re-enable the master role on"
1099
                        " the master, please restart manually.")
1100

    
1101

    
1102
def _RecursiveCheckIfLVMBased(disk):
1103
  """Check if the given disk or its children are lvm-based.
1104

1105
  Args:
1106
    disk: ganeti.objects.Disk object
1107

1108
  Returns:
1109
    boolean indicating whether a LD_LV dev_type was found or not
1110

1111
  """
1112
  if disk.children:
1113
    for chdisk in disk.children:
1114
      if _RecursiveCheckIfLVMBased(chdisk):
1115
        return True
1116
  return disk.dev_type == constants.LD_LV
1117

    
1118

    
1119
class LUSetClusterParams(LogicalUnit):
1120
  """Change the parameters of the cluster.
1121

1122
  """
1123
  HPATH = "cluster-modify"
1124
  HTYPE = constants.HTYPE_CLUSTER
1125
  _OP_REQP = []
1126
  REQ_BGL = False
1127

    
1128
  def ExpandNames(self):
1129
    # FIXME: in the future maybe other cluster params won't require checking on
1130
    # all nodes to be modified.
1131
    self.needed_locks = {
1132
      locking.LEVEL_NODE: locking.ALL_SET,
1133
    }
1134
    self.share_locks[locking.LEVEL_NODE] = 1
1135

    
1136
  def BuildHooksEnv(self):
1137
    """Build hooks env.
1138

1139
    """
1140
    env = {
1141
      "OP_TARGET": self.cfg.GetClusterName(),
1142
      "NEW_VG_NAME": self.op.vg_name,
1143
      }
1144
    mn = self.cfg.GetMasterNode()
1145
    return env, [mn], [mn]
1146

    
1147
  def CheckPrereq(self):
1148
    """Check prerequisites.
1149

1150
    This checks whether the given params don't conflict and
1151
    if the given volume group is valid.
1152

1153
    """
1154
    # FIXME: This only works because there is only one parameter that can be
1155
    # changed or removed.
1156
    if self.op.vg_name is not None and not self.op.vg_name:
1157
      instances = self.cfg.GetAllInstancesInfo().values()
1158
      for inst in instances:
1159
        for disk in inst.disks:
1160
          if _RecursiveCheckIfLVMBased(disk):
1161
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1162
                                       " lvm-based instances exist")
1163

    
1164
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1165

    
1166
    # if vg_name not None, checks given volume group on all nodes
1167
    if self.op.vg_name:
1168
      vglist = self.rpc.call_vg_list(node_list)
1169
      for node in node_list:
1170
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1171
                                              constants.MIN_VG_SIZE)
1172
        if vgstatus:
1173
          raise errors.OpPrereqError("Error on node '%s': %s" %
1174
                                     (node, vgstatus))
1175

    
1176
    self.cluster = cluster = self.cfg.GetClusterInfo()
1177
    # beparams changes do not need validation (we can't validate?),
1178
    # but we still process here
1179
    if self.op.beparams:
1180
      self.new_beparams = cluster.FillDict(
1181
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1182

    
1183
    # hypervisor list/parameters
1184
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1185
    if self.op.hvparams:
1186
      if not isinstance(self.op.hvparams, dict):
1187
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1188
      for hv_name, hv_dict in self.op.hvparams.items():
1189
        if hv_name not in self.new_hvparams:
1190
          self.new_hvparams[hv_name] = hv_dict
1191
        else:
1192
          self.new_hvparams[hv_name].update(hv_dict)
1193

    
1194
    if self.op.enabled_hypervisors is not None:
1195
      self.hv_list = self.op.enabled_hypervisors
1196
    else:
1197
      self.hv_list = cluster.enabled_hypervisors
1198

    
1199
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1200
      # either the enabled list has changed, or the parameters have, validate
1201
      for hv_name, hv_params in self.new_hvparams.items():
1202
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1203
            (self.op.enabled_hypervisors and
1204
             hv_name in self.op.enabled_hypervisors)):
1205
          # either this is a new hypervisor, or its parameters have changed
1206
          hv_class = hypervisor.GetHypervisor(hv_name)
1207
          hv_class.CheckParameterSyntax(hv_params)
1208
          _CheckHVParams(self, node_list, hv_name, hv_params)
1209

    
1210
  def Exec(self, feedback_fn):
1211
    """Change the parameters of the cluster.
1212

1213
    """
1214
    if self.op.vg_name is not None:
1215
      if self.op.vg_name != self.cfg.GetVGName():
1216
        self.cfg.SetVGName(self.op.vg_name)
1217
      else:
1218
        feedback_fn("Cluster LVM configuration already in desired"
1219
                    " state, not changing")
1220
    if self.op.hvparams:
1221
      self.cluster.hvparams = self.new_hvparams
1222
    if self.op.enabled_hypervisors is not None:
1223
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1224
    if self.op.beparams:
1225
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1226
    self.cfg.Update(self.cluster)
1227

    
1228

    
1229
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1230
  """Sleep and poll for an instance's disk to sync.
1231

1232
  """
1233
  if not instance.disks:
1234
    return True
1235

    
1236
  if not oneshot:
1237
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1238

    
1239
  node = instance.primary_node
1240

    
1241
  for dev in instance.disks:
1242
    lu.cfg.SetDiskID(dev, node)
1243

    
1244
  retries = 0
1245
  while True:
1246
    max_time = 0
1247
    done = True
1248
    cumul_degraded = False
1249
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1250
    if not rstats:
1251
      lu.LogWarning("Can't get any data from node %s", node)
1252
      retries += 1
1253
      if retries >= 10:
1254
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1255
                                 " aborting." % node)
1256
      time.sleep(6)
1257
      continue
1258
    retries = 0
1259
    for i in range(len(rstats)):
1260
      mstat = rstats[i]
1261
      if mstat is None:
1262
        lu.LogWarning("Can't compute data for node %s/%s",
1263
                           node, instance.disks[i].iv_name)
1264
        continue
1265
      # we ignore the ldisk parameter
1266
      perc_done, est_time, is_degraded, _ = mstat
1267
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1268
      if perc_done is not None:
1269
        done = False
1270
        if est_time is not None:
1271
          rem_time = "%d estimated seconds remaining" % est_time
1272
          max_time = est_time
1273
        else:
1274
          rem_time = "no time estimate"
1275
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1276
                        (instance.disks[i].iv_name, perc_done, rem_time))
1277
    if done or oneshot:
1278
      break
1279

    
1280
    time.sleep(min(60, max_time))
1281

    
1282
  if done:
1283
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1284
  return not cumul_degraded
1285

    
1286

    
1287
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1288
  """Check that mirrors are not degraded.
1289

1290
  The ldisk parameter, if True, will change the test from the
1291
  is_degraded attribute (which represents overall non-ok status for
1292
  the device(s)) to the ldisk (representing the local storage status).
1293

1294
  """
1295
  lu.cfg.SetDiskID(dev, node)
1296
  if ldisk:
1297
    idx = 6
1298
  else:
1299
    idx = 5
1300

    
1301
  result = True
1302
  if on_primary or dev.AssembleOnSecondary():
1303
    rstats = lu.rpc.call_blockdev_find(node, dev)
1304
    if not rstats:
1305
      logging.warning("Node %s: disk degraded, not found or node down", node)
1306
      result = False
1307
    else:
1308
      result = result and (not rstats[idx])
1309
  if dev.children:
1310
    for child in dev.children:
1311
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1312

    
1313
  return result
1314

    
1315

    
1316
class LUDiagnoseOS(NoHooksLU):
1317
  """Logical unit for OS diagnose/query.
1318

1319
  """
1320
  _OP_REQP = ["output_fields", "names"]
1321
  REQ_BGL = False
1322
  _FIELDS_STATIC = utils.FieldSet()
1323
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1324

    
1325
  def ExpandNames(self):
1326
    if self.op.names:
1327
      raise errors.OpPrereqError("Selective OS query not supported")
1328

    
1329
    _CheckOutputFields(static=self._FIELDS_STATIC,
1330
                       dynamic=self._FIELDS_DYNAMIC,
1331
                       selected=self.op.output_fields)
1332

    
1333
    # Lock all nodes, in shared mode
1334
    self.needed_locks = {}
1335
    self.share_locks[locking.LEVEL_NODE] = 1
1336
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1337

    
1338
  def CheckPrereq(self):
1339
    """Check prerequisites.
1340

1341
    """
1342

    
1343
  @staticmethod
1344
  def _DiagnoseByOS(node_list, rlist):
1345
    """Remaps a per-node return list into an a per-os per-node dictionary
1346

1347
      Args:
1348
        node_list: a list with the names of all nodes
1349
        rlist: a map with node names as keys and OS objects as values
1350

1351
      Returns:
1352
        map: a map with osnames as keys and as value another map, with
1353
             nodes as
1354
             keys and list of OS objects as values
1355
             e.g. {"debian-etch": {"node1": [<object>,...],
1356
                                   "node2": [<object>,]}
1357
                  }
1358

1359
    """
1360
    all_os = {}
1361
    for node_name, nr in rlist.iteritems():
1362
      if not nr:
1363
        continue
1364
      for os_obj in nr:
1365
        if os_obj.name not in all_os:
1366
          # build a list of nodes for this os containing empty lists
1367
          # for each node in node_list
1368
          all_os[os_obj.name] = {}
1369
          for nname in node_list:
1370
            all_os[os_obj.name][nname] = []
1371
        all_os[os_obj.name][node_name].append(os_obj)
1372
    return all_os
1373

    
1374
  def Exec(self, feedback_fn):
1375
    """Compute the list of OSes.
1376

1377
    """
1378
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1379
    node_data = self.rpc.call_os_diagnose(node_list)
1380
    if node_data == False:
1381
      raise errors.OpExecError("Can't gather the list of OSes")
1382
    pol = self._DiagnoseByOS(node_list, node_data)
1383
    output = []
1384
    for os_name, os_data in pol.iteritems():
1385
      row = []
1386
      for field in self.op.output_fields:
1387
        if field == "name":
1388
          val = os_name
1389
        elif field == "valid":
1390
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1391
        elif field == "node_status":
1392
          val = {}
1393
          for node_name, nos_list in os_data.iteritems():
1394
            val[node_name] = [(v.status, v.path) for v in nos_list]
1395
        else:
1396
          raise errors.ParameterError(field)
1397
        row.append(val)
1398
      output.append(row)
1399

    
1400
    return output
1401

    
1402

    
1403
class LURemoveNode(LogicalUnit):
1404
  """Logical unit for removing a node.
1405

1406
  """
1407
  HPATH = "node-remove"
1408
  HTYPE = constants.HTYPE_NODE
1409
  _OP_REQP = ["node_name"]
1410

    
1411
  def BuildHooksEnv(self):
1412
    """Build hooks env.
1413

1414
    This doesn't run on the target node in the pre phase as a failed
1415
    node would then be impossible to remove.
1416

1417
    """
1418
    env = {
1419
      "OP_TARGET": self.op.node_name,
1420
      "NODE_NAME": self.op.node_name,
1421
      }
1422
    all_nodes = self.cfg.GetNodeList()
1423
    all_nodes.remove(self.op.node_name)
1424
    return env, all_nodes, all_nodes
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks:
1430
     - the node exists in the configuration
1431
     - it does not have primary or secondary instances
1432
     - it's not the master
1433

1434
    Any errors are signalled by raising errors.OpPrereqError.
1435

1436
    """
1437
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1438
    if node is None:
1439
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1440

    
1441
    instance_list = self.cfg.GetInstanceList()
1442

    
1443
    masternode = self.cfg.GetMasterNode()
1444
    if node.name == masternode:
1445
      raise errors.OpPrereqError("Node is the master node,"
1446
                                 " you need to failover first.")
1447

    
1448
    for instance_name in instance_list:
1449
      instance = self.cfg.GetInstanceInfo(instance_name)
1450
      if node.name == instance.primary_node:
1451
        raise errors.OpPrereqError("Instance %s still running on the node,"
1452
                                   " please remove first." % instance_name)
1453
      if node.name in instance.secondary_nodes:
1454
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1455
                                   " please remove first." % instance_name)
1456
    self.op.node_name = node.name
1457
    self.node = node
1458

    
1459
  def Exec(self, feedback_fn):
1460
    """Removes the node from the cluster.
1461

1462
    """
1463
    node = self.node
1464
    logging.info("Stopping the node daemon and removing configs from node %s",
1465
                 node.name)
1466

    
1467
    self.context.RemoveNode(node.name)
1468

    
1469
    self.rpc.call_node_leave_cluster(node.name)
1470

    
1471

    
1472
class LUQueryNodes(NoHooksLU):
1473
  """Logical unit for querying nodes.
1474

1475
  """
1476
  _OP_REQP = ["output_fields", "names"]
1477
  REQ_BGL = False
1478
  _FIELDS_DYNAMIC = utils.FieldSet(
1479
    "dtotal", "dfree",
1480
    "mtotal", "mnode", "mfree",
1481
    "bootid",
1482
    "ctotal",
1483
    )
1484

    
1485
  _FIELDS_STATIC = utils.FieldSet(
1486
    "name", "pinst_cnt", "sinst_cnt",
1487
    "pinst_list", "sinst_list",
1488
    "pip", "sip", "tags",
1489
    "serial_no",
1490
    )
1491

    
1492
  def ExpandNames(self):
1493
    _CheckOutputFields(static=self._FIELDS_STATIC,
1494
                       dynamic=self._FIELDS_DYNAMIC,
1495
                       selected=self.op.output_fields)
1496

    
1497
    self.needed_locks = {}
1498
    self.share_locks[locking.LEVEL_NODE] = 1
1499

    
1500
    if self.op.names:
1501
      self.wanted = _GetWantedNodes(self, self.op.names)
1502
    else:
1503
      self.wanted = locking.ALL_SET
1504

    
1505
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1506
    if self.do_locking:
1507
      # if we don't request only static fields, we need to lock the nodes
1508
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1509

    
1510

    
1511
  def CheckPrereq(self):
1512
    """Check prerequisites.
1513

1514
    """
1515
    # The validation of the node list is done in the _GetWantedNodes,
1516
    # if non empty, and if empty, there's no validation to do
1517
    pass
1518

    
1519
  def Exec(self, feedback_fn):
1520
    """Computes the list of nodes and their attributes.
1521

1522
    """
1523
    all_info = self.cfg.GetAllNodesInfo()
1524
    if self.do_locking:
1525
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1526
    elif self.wanted != locking.ALL_SET:
1527
      nodenames = self.wanted
1528
      missing = set(nodenames).difference(all_info.keys())
1529
      if missing:
1530
        raise errors.OpExecError(
1531
          "Some nodes were removed before retrieving their data: %s" % missing)
1532
    else:
1533
      nodenames = all_info.keys()
1534

    
1535
    nodenames = utils.NiceSort(nodenames)
1536
    nodelist = [all_info[name] for name in nodenames]
1537

    
1538
    # begin data gathering
1539

    
1540
    if self.do_locking:
1541
      live_data = {}
1542
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1543
                                          self.cfg.GetHypervisorType())
1544
      for name in nodenames:
1545
        nodeinfo = node_data.get(name, None)
1546
        if nodeinfo:
1547
          live_data[name] = {
1548
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1549
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1550
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1551
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1552
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1553
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1554
            "bootid": nodeinfo['bootid'],
1555
            }
1556
        else:
1557
          live_data[name] = {}
1558
    else:
1559
      live_data = dict.fromkeys(nodenames, {})
1560

    
1561
    node_to_primary = dict([(name, set()) for name in nodenames])
1562
    node_to_secondary = dict([(name, set()) for name in nodenames])
1563

    
1564
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1565
                             "sinst_cnt", "sinst_list"))
1566
    if inst_fields & frozenset(self.op.output_fields):
1567
      instancelist = self.cfg.GetInstanceList()
1568

    
1569
      for instance_name in instancelist:
1570
        inst = self.cfg.GetInstanceInfo(instance_name)
1571
        if inst.primary_node in node_to_primary:
1572
          node_to_primary[inst.primary_node].add(inst.name)
1573
        for secnode in inst.secondary_nodes:
1574
          if secnode in node_to_secondary:
1575
            node_to_secondary[secnode].add(inst.name)
1576

    
1577
    # end data gathering
1578

    
1579
    output = []
1580
    for node in nodelist:
1581
      node_output = []
1582
      for field in self.op.output_fields:
1583
        if field == "name":
1584
          val = node.name
1585
        elif field == "pinst_list":
1586
          val = list(node_to_primary[node.name])
1587
        elif field == "sinst_list":
1588
          val = list(node_to_secondary[node.name])
1589
        elif field == "pinst_cnt":
1590
          val = len(node_to_primary[node.name])
1591
        elif field == "sinst_cnt":
1592
          val = len(node_to_secondary[node.name])
1593
        elif field == "pip":
1594
          val = node.primary_ip
1595
        elif field == "sip":
1596
          val = node.secondary_ip
1597
        elif field == "tags":
1598
          val = list(node.GetTags())
1599
        elif field == "serial_no":
1600
          val = node.serial_no
1601
        elif self._FIELDS_DYNAMIC.Matches(field):
1602
          val = live_data[node.name].get(field, None)
1603
        else:
1604
          raise errors.ParameterError(field)
1605
        node_output.append(val)
1606
      output.append(node_output)
1607

    
1608
    return output
1609

    
1610

    
1611
class LUQueryNodeVolumes(NoHooksLU):
1612
  """Logical unit for getting volumes on node(s).
1613

1614
  """
1615
  _OP_REQP = ["nodes", "output_fields"]
1616
  REQ_BGL = False
1617
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1618
  _FIELDS_STATIC = utils.FieldSet("node")
1619

    
1620
  def ExpandNames(self):
1621
    _CheckOutputFields(static=self._FIELDS_STATIC,
1622
                       dynamic=self._FIELDS_DYNAMIC,
1623
                       selected=self.op.output_fields)
1624

    
1625
    self.needed_locks = {}
1626
    self.share_locks[locking.LEVEL_NODE] = 1
1627
    if not self.op.nodes:
1628
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1629
    else:
1630
      self.needed_locks[locking.LEVEL_NODE] = \
1631
        _GetWantedNodes(self, self.op.nodes)
1632

    
1633
  def CheckPrereq(self):
1634
    """Check prerequisites.
1635

1636
    This checks that the fields required are valid output fields.
1637

1638
    """
1639
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1640

    
1641
  def Exec(self, feedback_fn):
1642
    """Computes the list of nodes and their attributes.
1643

1644
    """
1645
    nodenames = self.nodes
1646
    volumes = self.rpc.call_node_volumes(nodenames)
1647

    
1648
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1649
             in self.cfg.GetInstanceList()]
1650

    
1651
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1652

    
1653
    output = []
1654
    for node in nodenames:
1655
      if node not in volumes or not volumes[node]:
1656
        continue
1657

    
1658
      node_vols = volumes[node][:]
1659
      node_vols.sort(key=lambda vol: vol['dev'])
1660

    
1661
      for vol in node_vols:
1662
        node_output = []
1663
        for field in self.op.output_fields:
1664
          if field == "node":
1665
            val = node
1666
          elif field == "phys":
1667
            val = vol['dev']
1668
          elif field == "vg":
1669
            val = vol['vg']
1670
          elif field == "name":
1671
            val = vol['name']
1672
          elif field == "size":
1673
            val = int(float(vol['size']))
1674
          elif field == "instance":
1675
            for inst in ilist:
1676
              if node not in lv_by_node[inst]:
1677
                continue
1678
              if vol['name'] in lv_by_node[inst][node]:
1679
                val = inst.name
1680
                break
1681
            else:
1682
              val = '-'
1683
          else:
1684
            raise errors.ParameterError(field)
1685
          node_output.append(str(val))
1686

    
1687
        output.append(node_output)
1688

    
1689
    return output
1690

    
1691

    
1692
class LUAddNode(LogicalUnit):
1693
  """Logical unit for adding node to the cluster.
1694

1695
  """
1696
  HPATH = "node-add"
1697
  HTYPE = constants.HTYPE_NODE
1698
  _OP_REQP = ["node_name"]
1699

    
1700
  def BuildHooksEnv(self):
1701
    """Build hooks env.
1702

1703
    This will run on all nodes before, and on all nodes + the new node after.
1704

1705
    """
1706
    env = {
1707
      "OP_TARGET": self.op.node_name,
1708
      "NODE_NAME": self.op.node_name,
1709
      "NODE_PIP": self.op.primary_ip,
1710
      "NODE_SIP": self.op.secondary_ip,
1711
      }
1712
    nodes_0 = self.cfg.GetNodeList()
1713
    nodes_1 = nodes_0 + [self.op.node_name, ]
1714
    return env, nodes_0, nodes_1
1715

    
1716
  def CheckPrereq(self):
1717
    """Check prerequisites.
1718

1719
    This checks:
1720
     - the new node is not already in the config
1721
     - it is resolvable
1722
     - its parameters (single/dual homed) matches the cluster
1723

1724
    Any errors are signalled by raising errors.OpPrereqError.
1725

1726
    """
1727
    node_name = self.op.node_name
1728
    cfg = self.cfg
1729

    
1730
    dns_data = utils.HostInfo(node_name)
1731

    
1732
    node = dns_data.name
1733
    primary_ip = self.op.primary_ip = dns_data.ip
1734
    secondary_ip = getattr(self.op, "secondary_ip", None)
1735
    if secondary_ip is None:
1736
      secondary_ip = primary_ip
1737
    if not utils.IsValidIP(secondary_ip):
1738
      raise errors.OpPrereqError("Invalid secondary IP given")
1739
    self.op.secondary_ip = secondary_ip
1740

    
1741
    node_list = cfg.GetNodeList()
1742
    if not self.op.readd and node in node_list:
1743
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1744
                                 node)
1745
    elif self.op.readd and node not in node_list:
1746
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1747

    
1748
    for existing_node_name in node_list:
1749
      existing_node = cfg.GetNodeInfo(existing_node_name)
1750

    
1751
      if self.op.readd and node == existing_node_name:
1752
        if (existing_node.primary_ip != primary_ip or
1753
            existing_node.secondary_ip != secondary_ip):
1754
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1755
                                     " address configuration as before")
1756
        continue
1757

    
1758
      if (existing_node.primary_ip == primary_ip or
1759
          existing_node.secondary_ip == primary_ip or
1760
          existing_node.primary_ip == secondary_ip or
1761
          existing_node.secondary_ip == secondary_ip):
1762
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1763
                                   " existing node %s" % existing_node.name)
1764

    
1765
    # check that the type of the node (single versus dual homed) is the
1766
    # same as for the master
1767
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1768
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1769
    newbie_singlehomed = secondary_ip == primary_ip
1770
    if master_singlehomed != newbie_singlehomed:
1771
      if master_singlehomed:
1772
        raise errors.OpPrereqError("The master has no private ip but the"
1773
                                   " new node has one")
1774
      else:
1775
        raise errors.OpPrereqError("The master has a private ip but the"
1776
                                   " new node doesn't have one")
1777

    
1778
    # checks reachablity
1779
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1780
      raise errors.OpPrereqError("Node not reachable by ping")
1781

    
1782
    if not newbie_singlehomed:
1783
      # check reachability from my secondary ip to newbie's secondary ip
1784
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1785
                           source=myself.secondary_ip):
1786
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1787
                                   " based ping to noded port")
1788

    
1789
    self.new_node = objects.Node(name=node,
1790
                                 primary_ip=primary_ip,
1791
                                 secondary_ip=secondary_ip)
1792

    
1793
  def Exec(self, feedback_fn):
1794
    """Adds the new node to the cluster.
1795

1796
    """
1797
    new_node = self.new_node
1798
    node = new_node.name
1799

    
1800
    # check connectivity
1801
    result = self.rpc.call_version([node])[node]
1802
    if result:
1803
      if constants.PROTOCOL_VERSION == result:
1804
        logging.info("Communication to node %s fine, sw version %s match",
1805
                     node, result)
1806
      else:
1807
        raise errors.OpExecError("Version mismatch master version %s,"
1808
                                 " node version %s" %
1809
                                 (constants.PROTOCOL_VERSION, result))
1810
    else:
1811
      raise errors.OpExecError("Cannot get version from the new node")
1812

    
1813
    # setup ssh on node
1814
    logging.info("Copy ssh key to node %s", node)
1815
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1816
    keyarray = []
1817
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1818
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1819
                priv_key, pub_key]
1820

    
1821
    for i in keyfiles:
1822
      f = open(i, 'r')
1823
      try:
1824
        keyarray.append(f.read())
1825
      finally:
1826
        f.close()
1827

    
1828
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1829
                                    keyarray[2],
1830
                                    keyarray[3], keyarray[4], keyarray[5])
1831

    
1832
    if not result:
1833
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1834

    
1835
    # Add node to our /etc/hosts, and add key to known_hosts
1836
    utils.AddHostToEtcHosts(new_node.name)
1837

    
1838
    if new_node.secondary_ip != new_node.primary_ip:
1839
      if not self.rpc.call_node_has_ip_address(new_node.name,
1840
                                               new_node.secondary_ip):
1841
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1842
                                 " you gave (%s). Please fix and re-run this"
1843
                                 " command." % new_node.secondary_ip)
1844

    
1845
    node_verify_list = [self.cfg.GetMasterNode()]
1846
    node_verify_param = {
1847
      'nodelist': [node],
1848
      # TODO: do a node-net-test as well?
1849
    }
1850

    
1851
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1852
                                       self.cfg.GetClusterName())
1853
    for verifier in node_verify_list:
1854
      if not result[verifier]:
1855
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1856
                                 " for remote verification" % verifier)
1857
      if result[verifier]['nodelist']:
1858
        for failed in result[verifier]['nodelist']:
1859
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1860
                      (verifier, result[verifier]['nodelist'][failed]))
1861
        raise errors.OpExecError("ssh/hostname verification failed.")
1862

    
1863
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1864
    # including the node just added
1865
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1866
    dist_nodes = self.cfg.GetNodeList()
1867
    if not self.op.readd:
1868
      dist_nodes.append(node)
1869
    if myself.name in dist_nodes:
1870
      dist_nodes.remove(myself.name)
1871

    
1872
    logging.debug("Copying hosts and known_hosts to all nodes")
1873
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1874
      result = self.rpc.call_upload_file(dist_nodes, fname)
1875
      for to_node in dist_nodes:
1876
        if not result[to_node]:
1877
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1878

    
1879
    to_copy = []
1880
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1881
      to_copy.append(constants.VNC_PASSWORD_FILE)
1882
    for fname in to_copy:
1883
      result = self.rpc.call_upload_file([node], fname)
1884
      if not result[node]:
1885
        logging.error("Could not copy file %s to node %s", fname, node)
1886

    
1887
    if self.op.readd:
1888
      self.context.ReaddNode(new_node)
1889
    else:
1890
      self.context.AddNode(new_node)
1891

    
1892

    
1893
class LUQueryClusterInfo(NoHooksLU):
1894
  """Query cluster configuration.
1895

1896
  """
1897
  _OP_REQP = []
1898
  REQ_MASTER = False
1899
  REQ_BGL = False
1900

    
1901
  def ExpandNames(self):
1902
    self.needed_locks = {}
1903

    
1904
  def CheckPrereq(self):
1905
    """No prerequsites needed for this LU.
1906

1907
    """
1908
    pass
1909

    
1910
  def Exec(self, feedback_fn):
1911
    """Return cluster config.
1912

1913
    """
1914
    cluster = self.cfg.GetClusterInfo()
1915
    result = {
1916
      "software_version": constants.RELEASE_VERSION,
1917
      "protocol_version": constants.PROTOCOL_VERSION,
1918
      "config_version": constants.CONFIG_VERSION,
1919
      "os_api_version": constants.OS_API_VERSION,
1920
      "export_version": constants.EXPORT_VERSION,
1921
      "architecture": (platform.architecture()[0], platform.machine()),
1922
      "name": cluster.cluster_name,
1923
      "master": cluster.master_node,
1924
      "default_hypervisor": cluster.default_hypervisor,
1925
      "enabled_hypervisors": cluster.enabled_hypervisors,
1926
      "hvparams": cluster.hvparams,
1927
      "beparams": cluster.beparams,
1928
      }
1929

    
1930
    return result
1931

    
1932

    
1933
class LUQueryConfigValues(NoHooksLU):
1934
  """Return configuration values.
1935

1936
  """
1937
  _OP_REQP = []
1938
  REQ_BGL = False
1939
  _FIELDS_DYNAMIC = utils.FieldSet()
1940
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
1941

    
1942
  def ExpandNames(self):
1943
    self.needed_locks = {}
1944

    
1945
    _CheckOutputFields(static=self._FIELDS_STATIC,
1946
                       dynamic=self._FIELDS_DYNAMIC,
1947
                       selected=self.op.output_fields)
1948

    
1949
  def CheckPrereq(self):
1950
    """No prerequisites.
1951

1952
    """
1953
    pass
1954

    
1955
  def Exec(self, feedback_fn):
1956
    """Dump a representation of the cluster config to the standard output.
1957

1958
    """
1959
    values = []
1960
    for field in self.op.output_fields:
1961
      if field == "cluster_name":
1962
        entry = self.cfg.GetClusterName()
1963
      elif field == "master_node":
1964
        entry = self.cfg.GetMasterNode()
1965
      elif field == "drain_flag":
1966
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1967
      else:
1968
        raise errors.ParameterError(field)
1969
      values.append(entry)
1970
    return values
1971

    
1972

    
1973
class LUActivateInstanceDisks(NoHooksLU):
1974
  """Bring up an instance's disks.
1975

1976
  """
1977
  _OP_REQP = ["instance_name"]
1978
  REQ_BGL = False
1979

    
1980
  def ExpandNames(self):
1981
    self._ExpandAndLockInstance()
1982
    self.needed_locks[locking.LEVEL_NODE] = []
1983
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1984

    
1985
  def DeclareLocks(self, level):
1986
    if level == locking.LEVEL_NODE:
1987
      self._LockInstancesNodes()
1988

    
1989
  def CheckPrereq(self):
1990
    """Check prerequisites.
1991

1992
    This checks that the instance is in the cluster.
1993

1994
    """
1995
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1996
    assert self.instance is not None, \
1997
      "Cannot retrieve locked instance %s" % self.op.instance_name
1998

    
1999
  def Exec(self, feedback_fn):
2000
    """Activate the disks.
2001

2002
    """
2003
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2004
    if not disks_ok:
2005
      raise errors.OpExecError("Cannot activate block devices")
2006

    
2007
    return disks_info
2008

    
2009

    
2010
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2011
  """Prepare the block devices for an instance.
2012

2013
  This sets up the block devices on all nodes.
2014

2015
  Args:
2016
    instance: a ganeti.objects.Instance object
2017
    ignore_secondaries: if true, errors on secondary nodes won't result
2018
                        in an error return from the function
2019

2020
  Returns:
2021
    false if the operation failed
2022
    list of (host, instance_visible_name, node_visible_name) if the operation
2023
         suceeded with the mapping from node devices to instance devices
2024
  """
2025
  device_info = []
2026
  disks_ok = True
2027
  iname = instance.name
2028
  # With the two passes mechanism we try to reduce the window of
2029
  # opportunity for the race condition of switching DRBD to primary
2030
  # before handshaking occured, but we do not eliminate it
2031

    
2032
  # The proper fix would be to wait (with some limits) until the
2033
  # connection has been made and drbd transitions from WFConnection
2034
  # into any other network-connected state (Connected, SyncTarget,
2035
  # SyncSource, etc.)
2036

    
2037
  # 1st pass, assemble on all nodes in secondary mode
2038
  for inst_disk in instance.disks:
2039
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2040
      lu.cfg.SetDiskID(node_disk, node)
2041
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2042
      if not result:
2043
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2044
                           " (is_primary=False, pass=1)",
2045
                           inst_disk.iv_name, node)
2046
        if not ignore_secondaries:
2047
          disks_ok = False
2048

    
2049
  # FIXME: race condition on drbd migration to primary
2050

    
2051
  # 2nd pass, do only the primary node
2052
  for inst_disk in instance.disks:
2053
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2054
      if node != instance.primary_node:
2055
        continue
2056
      lu.cfg.SetDiskID(node_disk, node)
2057
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2058
      if not result:
2059
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2060
                           " (is_primary=True, pass=2)",
2061
                           inst_disk.iv_name, node)
2062
        disks_ok = False
2063
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2064

    
2065
  # leave the disks configured for the primary node
2066
  # this is a workaround that would be fixed better by
2067
  # improving the logical/physical id handling
2068
  for disk in instance.disks:
2069
    lu.cfg.SetDiskID(disk, instance.primary_node)
2070

    
2071
  return disks_ok, device_info
2072

    
2073

    
2074
def _StartInstanceDisks(lu, instance, force):
2075
  """Start the disks of an instance.
2076

2077
  """
2078
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2079
                                           ignore_secondaries=force)
2080
  if not disks_ok:
2081
    _ShutdownInstanceDisks(lu, instance)
2082
    if force is not None and not force:
2083
      lu.proc.LogWarning("", hint="If the message above refers to a"
2084
                         " secondary node,"
2085
                         " you can retry the operation using '--force'.")
2086
    raise errors.OpExecError("Disk consistency error")
2087

    
2088

    
2089
class LUDeactivateInstanceDisks(NoHooksLU):
2090
  """Shutdown an instance's disks.
2091

2092
  """
2093
  _OP_REQP = ["instance_name"]
2094
  REQ_BGL = False
2095

    
2096
  def ExpandNames(self):
2097
    self._ExpandAndLockInstance()
2098
    self.needed_locks[locking.LEVEL_NODE] = []
2099
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2100

    
2101
  def DeclareLocks(self, level):
2102
    if level == locking.LEVEL_NODE:
2103
      self._LockInstancesNodes()
2104

    
2105
  def CheckPrereq(self):
2106
    """Check prerequisites.
2107

2108
    This checks that the instance is in the cluster.
2109

2110
    """
2111
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2112
    assert self.instance is not None, \
2113
      "Cannot retrieve locked instance %s" % self.op.instance_name
2114

    
2115
  def Exec(self, feedback_fn):
2116
    """Deactivate the disks
2117

2118
    """
2119
    instance = self.instance
2120
    _SafeShutdownInstanceDisks(self, instance)
2121

    
2122

    
2123
def _SafeShutdownInstanceDisks(lu, instance):
2124
  """Shutdown block devices of an instance.
2125

2126
  This function checks if an instance is running, before calling
2127
  _ShutdownInstanceDisks.
2128

2129
  """
2130
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2131
                                      [instance.hypervisor])
2132
  ins_l = ins_l[instance.primary_node]
2133
  if not type(ins_l) is list:
2134
    raise errors.OpExecError("Can't contact node '%s'" %
2135
                             instance.primary_node)
2136

    
2137
  if instance.name in ins_l:
2138
    raise errors.OpExecError("Instance is running, can't shutdown"
2139
                             " block devices.")
2140

    
2141
  _ShutdownInstanceDisks(lu, instance)
2142

    
2143

    
2144
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2145
  """Shutdown block devices of an instance.
2146

2147
  This does the shutdown on all nodes of the instance.
2148

2149
  If the ignore_primary is false, errors on the primary node are
2150
  ignored.
2151

2152
  """
2153
  result = True
2154
  for disk in instance.disks:
2155
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2156
      lu.cfg.SetDiskID(top_disk, node)
2157
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2158
        logging.error("Could not shutdown block device %s on node %s",
2159
                      disk.iv_name, node)
2160
        if not ignore_primary or node != instance.primary_node:
2161
          result = False
2162
  return result
2163

    
2164

    
2165
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2166
  """Checks if a node has enough free memory.
2167

2168
  This function check if a given node has the needed amount of free
2169
  memory. In case the node has less memory or we cannot get the
2170
  information from the node, this function raise an OpPrereqError
2171
  exception.
2172

2173
  @type lu: C{LogicalUnit}
2174
  @param lu: a logical unit from which we get configuration data
2175
  @type node: C{str}
2176
  @param node: the node to check
2177
  @type reason: C{str}
2178
  @param reason: string to use in the error message
2179
  @type requested: C{int}
2180
  @param requested: the amount of memory in MiB to check for
2181
  @type hypervisor: C{str}
2182
  @param hypervisor: the hypervisor to ask for memory stats
2183
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2184
      we cannot check the node
2185

2186
  """
2187
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2188
  if not nodeinfo or not isinstance(nodeinfo, dict):
2189
    raise errors.OpPrereqError("Could not contact node %s for resource"
2190
                             " information" % (node,))
2191

    
2192
  free_mem = nodeinfo[node].get('memory_free')
2193
  if not isinstance(free_mem, int):
2194
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2195
                             " was '%s'" % (node, free_mem))
2196
  if requested > free_mem:
2197
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2198
                             " needed %s MiB, available %s MiB" %
2199
                             (node, reason, requested, free_mem))
2200

    
2201

    
2202
class LUStartupInstance(LogicalUnit):
2203
  """Starts an instance.
2204

2205
  """
2206
  HPATH = "instance-start"
2207
  HTYPE = constants.HTYPE_INSTANCE
2208
  _OP_REQP = ["instance_name", "force"]
2209
  REQ_BGL = False
2210

    
2211
  def ExpandNames(self):
2212
    self._ExpandAndLockInstance()
2213
    self.needed_locks[locking.LEVEL_NODE] = []
2214
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2215

    
2216
  def DeclareLocks(self, level):
2217
    if level == locking.LEVEL_NODE:
2218
      self._LockInstancesNodes()
2219

    
2220
  def BuildHooksEnv(self):
2221
    """Build hooks env.
2222

2223
    This runs on master, primary and secondary nodes of the instance.
2224

2225
    """
2226
    env = {
2227
      "FORCE": self.op.force,
2228
      }
2229
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2230
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2231
          list(self.instance.secondary_nodes))
2232
    return env, nl, nl
2233

    
2234
  def CheckPrereq(self):
2235
    """Check prerequisites.
2236

2237
    This checks that the instance is in the cluster.
2238

2239
    """
2240
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2241
    assert self.instance is not None, \
2242
      "Cannot retrieve locked instance %s" % self.op.instance_name
2243

    
2244
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2245
    # check bridges existance
2246
    _CheckInstanceBridgesExist(self, instance)
2247

    
2248
    _CheckNodeFreeMemory(self, instance.primary_node,
2249
                         "starting instance %s" % instance.name,
2250
                         bep[constants.BE_MEMORY], instance.hypervisor)
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Start the instance.
2254

2255
    """
2256
    instance = self.instance
2257
    force = self.op.force
2258
    extra_args = getattr(self.op, "extra_args", "")
2259

    
2260
    self.cfg.MarkInstanceUp(instance.name)
2261

    
2262
    node_current = instance.primary_node
2263

    
2264
    _StartInstanceDisks(self, instance, force)
2265

    
2266
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2267
      _ShutdownInstanceDisks(self, instance)
2268
      raise errors.OpExecError("Could not start instance")
2269

    
2270

    
2271
class LURebootInstance(LogicalUnit):
2272
  """Reboot an instance.
2273

2274
  """
2275
  HPATH = "instance-reboot"
2276
  HTYPE = constants.HTYPE_INSTANCE
2277
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2278
  REQ_BGL = False
2279

    
2280
  def ExpandNames(self):
2281
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2282
                                   constants.INSTANCE_REBOOT_HARD,
2283
                                   constants.INSTANCE_REBOOT_FULL]:
2284
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2285
                                  (constants.INSTANCE_REBOOT_SOFT,
2286
                                   constants.INSTANCE_REBOOT_HARD,
2287
                                   constants.INSTANCE_REBOOT_FULL))
2288
    self._ExpandAndLockInstance()
2289
    self.needed_locks[locking.LEVEL_NODE] = []
2290
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2291

    
2292
  def DeclareLocks(self, level):
2293
    if level == locking.LEVEL_NODE:
2294
      primary_only = not constants.INSTANCE_REBOOT_FULL
2295
      self._LockInstancesNodes(primary_only=primary_only)
2296

    
2297
  def BuildHooksEnv(self):
2298
    """Build hooks env.
2299

2300
    This runs on master, primary and secondary nodes of the instance.
2301

2302
    """
2303
    env = {
2304
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2305
      }
2306
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2307
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2308
          list(self.instance.secondary_nodes))
2309
    return env, nl, nl
2310

    
2311
  def CheckPrereq(self):
2312
    """Check prerequisites.
2313

2314
    This checks that the instance is in the cluster.
2315

2316
    """
2317
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2318
    assert self.instance is not None, \
2319
      "Cannot retrieve locked instance %s" % self.op.instance_name
2320

    
2321
    # check bridges existance
2322
    _CheckInstanceBridgesExist(self, instance)
2323

    
2324
  def Exec(self, feedback_fn):
2325
    """Reboot the instance.
2326

2327
    """
2328
    instance = self.instance
2329
    ignore_secondaries = self.op.ignore_secondaries
2330
    reboot_type = self.op.reboot_type
2331
    extra_args = getattr(self.op, "extra_args", "")
2332

    
2333
    node_current = instance.primary_node
2334

    
2335
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2336
                       constants.INSTANCE_REBOOT_HARD]:
2337
      if not self.rpc.call_instance_reboot(node_current, instance,
2338
                                           reboot_type, extra_args):
2339
        raise errors.OpExecError("Could not reboot instance")
2340
    else:
2341
      if not self.rpc.call_instance_shutdown(node_current, instance):
2342
        raise errors.OpExecError("could not shutdown instance for full reboot")
2343
      _ShutdownInstanceDisks(self, instance)
2344
      _StartInstanceDisks(self, instance, ignore_secondaries)
2345
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2346
        _ShutdownInstanceDisks(self, instance)
2347
        raise errors.OpExecError("Could not start instance for full reboot")
2348

    
2349
    self.cfg.MarkInstanceUp(instance.name)
2350

    
2351

    
2352
class LUShutdownInstance(LogicalUnit):
2353
  """Shutdown an instance.
2354

2355
  """
2356
  HPATH = "instance-stop"
2357
  HTYPE = constants.HTYPE_INSTANCE
2358
  _OP_REQP = ["instance_name"]
2359
  REQ_BGL = False
2360

    
2361
  def ExpandNames(self):
2362
    self._ExpandAndLockInstance()
2363
    self.needed_locks[locking.LEVEL_NODE] = []
2364
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2365

    
2366
  def DeclareLocks(self, level):
2367
    if level == locking.LEVEL_NODE:
2368
      self._LockInstancesNodes()
2369

    
2370
  def BuildHooksEnv(self):
2371
    """Build hooks env.
2372

2373
    This runs on master, primary and secondary nodes of the instance.
2374

2375
    """
2376
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2377
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2378
          list(self.instance.secondary_nodes))
2379
    return env, nl, nl
2380

    
2381
  def CheckPrereq(self):
2382
    """Check prerequisites.
2383

2384
    This checks that the instance is in the cluster.
2385

2386
    """
2387
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2388
    assert self.instance is not None, \
2389
      "Cannot retrieve locked instance %s" % self.op.instance_name
2390

    
2391
  def Exec(self, feedback_fn):
2392
    """Shutdown the instance.
2393

2394
    """
2395
    instance = self.instance
2396
    node_current = instance.primary_node
2397
    self.cfg.MarkInstanceDown(instance.name)
2398
    if not self.rpc.call_instance_shutdown(node_current, instance):
2399
      self.proc.LogWarning("Could not shutdown instance")
2400

    
2401
    _ShutdownInstanceDisks(self, instance)
2402

    
2403

    
2404
class LUReinstallInstance(LogicalUnit):
2405
  """Reinstall an instance.
2406

2407
  """
2408
  HPATH = "instance-reinstall"
2409
  HTYPE = constants.HTYPE_INSTANCE
2410
  _OP_REQP = ["instance_name"]
2411
  REQ_BGL = False
2412

    
2413
  def ExpandNames(self):
2414
    self._ExpandAndLockInstance()
2415
    self.needed_locks[locking.LEVEL_NODE] = []
2416
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2417

    
2418
  def DeclareLocks(self, level):
2419
    if level == locking.LEVEL_NODE:
2420
      self._LockInstancesNodes()
2421

    
2422
  def BuildHooksEnv(self):
2423
    """Build hooks env.
2424

2425
    This runs on master, primary and secondary nodes of the instance.
2426

2427
    """
2428
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2429
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2430
          list(self.instance.secondary_nodes))
2431
    return env, nl, nl
2432

    
2433
  def CheckPrereq(self):
2434
    """Check prerequisites.
2435

2436
    This checks that the instance is in the cluster and is not running.
2437

2438
    """
2439
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2440
    assert instance is not None, \
2441
      "Cannot retrieve locked instance %s" % self.op.instance_name
2442

    
2443
    if instance.disk_template == constants.DT_DISKLESS:
2444
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2445
                                 self.op.instance_name)
2446
    if instance.status != "down":
2447
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2448
                                 self.op.instance_name)
2449
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2450
                                              instance.name,
2451
                                              instance.hypervisor)
2452
    if remote_info:
2453
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2454
                                 (self.op.instance_name,
2455
                                  instance.primary_node))
2456

    
2457
    self.op.os_type = getattr(self.op, "os_type", None)
2458
    if self.op.os_type is not None:
2459
      # OS verification
2460
      pnode = self.cfg.GetNodeInfo(
2461
        self.cfg.ExpandNodeName(instance.primary_node))
2462
      if pnode is None:
2463
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2464
                                   self.op.pnode)
2465
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2466
      if not os_obj:
2467
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2468
                                   " primary node"  % self.op.os_type)
2469

    
2470
    self.instance = instance
2471

    
2472
  def Exec(self, feedback_fn):
2473
    """Reinstall the instance.
2474

2475
    """
2476
    inst = self.instance
2477

    
2478
    if self.op.os_type is not None:
2479
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2480
      inst.os = self.op.os_type
2481
      self.cfg.Update(inst)
2482

    
2483
    _StartInstanceDisks(self, inst, None)
2484
    try:
2485
      feedback_fn("Running the instance OS create scripts...")
2486
      if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2487
        raise errors.OpExecError("Could not install OS for instance %s"
2488
                                 " on node %s" %
2489
                                 (inst.name, inst.primary_node))
2490
    finally:
2491
      _ShutdownInstanceDisks(self, inst)
2492

    
2493

    
2494
class LURenameInstance(LogicalUnit):
2495
  """Rename an instance.
2496

2497
  """
2498
  HPATH = "instance-rename"
2499
  HTYPE = constants.HTYPE_INSTANCE
2500
  _OP_REQP = ["instance_name", "new_name"]
2501

    
2502
  def BuildHooksEnv(self):
2503
    """Build hooks env.
2504

2505
    This runs on master, primary and secondary nodes of the instance.
2506

2507
    """
2508
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2509
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2510
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2511
          list(self.instance.secondary_nodes))
2512
    return env, nl, nl
2513

    
2514
  def CheckPrereq(self):
2515
    """Check prerequisites.
2516

2517
    This checks that the instance is in the cluster and is not running.
2518

2519
    """
2520
    instance = self.cfg.GetInstanceInfo(
2521
      self.cfg.ExpandInstanceName(self.op.instance_name))
2522
    if instance is None:
2523
      raise errors.OpPrereqError("Instance '%s' not known" %
2524
                                 self.op.instance_name)
2525
    if instance.status != "down":
2526
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2527
                                 self.op.instance_name)
2528
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2529
                                              instance.name,
2530
                                              instance.hypervisor)
2531
    if remote_info:
2532
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2533
                                 (self.op.instance_name,
2534
                                  instance.primary_node))
2535
    self.instance = instance
2536

    
2537
    # new name verification
2538
    name_info = utils.HostInfo(self.op.new_name)
2539

    
2540
    self.op.new_name = new_name = name_info.name
2541
    instance_list = self.cfg.GetInstanceList()
2542
    if new_name in instance_list:
2543
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2544
                                 new_name)
2545

    
2546
    if not getattr(self.op, "ignore_ip", False):
2547
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2548
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2549
                                   (name_info.ip, new_name))
2550

    
2551

    
2552
  def Exec(self, feedback_fn):
2553
    """Reinstall the instance.
2554

2555
    """
2556
    inst = self.instance
2557
    old_name = inst.name
2558

    
2559
    if inst.disk_template == constants.DT_FILE:
2560
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2561

    
2562
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2563
    # Change the instance lock. This is definitely safe while we hold the BGL
2564
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2565
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2566

    
2567
    # re-read the instance from the configuration after rename
2568
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2569

    
2570
    if inst.disk_template == constants.DT_FILE:
2571
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2572
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2573
                                                     old_file_storage_dir,
2574
                                                     new_file_storage_dir)
2575

    
2576
      if not result:
2577
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2578
                                 " directory '%s' to '%s' (but the instance"
2579
                                 " has been renamed in Ganeti)" % (
2580
                                 inst.primary_node, old_file_storage_dir,
2581
                                 new_file_storage_dir))
2582

    
2583
      if not result[0]:
2584
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2585
                                 " (but the instance has been renamed in"
2586
                                 " Ganeti)" % (old_file_storage_dir,
2587
                                               new_file_storage_dir))
2588

    
2589
    _StartInstanceDisks(self, inst, None)
2590
    try:
2591
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2592
                                               old_name):
2593
        msg = ("Could not run OS rename script for instance %s on node %s"
2594
               " (but the instance has been renamed in Ganeti)" %
2595
               (inst.name, inst.primary_node))
2596
        self.proc.LogWarning(msg)
2597
    finally:
2598
      _ShutdownInstanceDisks(self, inst)
2599

    
2600

    
2601
class LURemoveInstance(LogicalUnit):
2602
  """Remove an instance.
2603

2604
  """
2605
  HPATH = "instance-remove"
2606
  HTYPE = constants.HTYPE_INSTANCE
2607
  _OP_REQP = ["instance_name", "ignore_failures"]
2608
  REQ_BGL = False
2609

    
2610
  def ExpandNames(self):
2611
    self._ExpandAndLockInstance()
2612
    self.needed_locks[locking.LEVEL_NODE] = []
2613
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2614

    
2615
  def DeclareLocks(self, level):
2616
    if level == locking.LEVEL_NODE:
2617
      self._LockInstancesNodes()
2618

    
2619
  def BuildHooksEnv(self):
2620
    """Build hooks env.
2621

2622
    This runs on master, primary and secondary nodes of the instance.
2623

2624
    """
2625
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2626
    nl = [self.cfg.GetMasterNode()]
2627
    return env, nl, nl
2628

    
2629
  def CheckPrereq(self):
2630
    """Check prerequisites.
2631

2632
    This checks that the instance is in the cluster.
2633

2634
    """
2635
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2636
    assert self.instance is not None, \
2637
      "Cannot retrieve locked instance %s" % self.op.instance_name
2638

    
2639
  def Exec(self, feedback_fn):
2640
    """Remove the instance.
2641

2642
    """
2643
    instance = self.instance
2644
    logging.info("Shutting down instance %s on node %s",
2645
                 instance.name, instance.primary_node)
2646

    
2647
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2648
      if self.op.ignore_failures:
2649
        feedback_fn("Warning: can't shutdown instance")
2650
      else:
2651
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2652
                                 (instance.name, instance.primary_node))
2653

    
2654
    logging.info("Removing block devices for instance %s", instance.name)
2655

    
2656
    if not _RemoveDisks(self, instance):
2657
      if self.op.ignore_failures:
2658
        feedback_fn("Warning: can't remove instance's disks")
2659
      else:
2660
        raise errors.OpExecError("Can't remove instance's disks")
2661

    
2662
    logging.info("Removing instance %s out of cluster config", instance.name)
2663

    
2664
    self.cfg.RemoveInstance(instance.name)
2665
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2666

    
2667

    
2668
class LUQueryInstances(NoHooksLU):
2669
  """Logical unit for querying instances.
2670

2671
  """
2672
  _OP_REQP = ["output_fields", "names"]
2673
  REQ_BGL = False
2674
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2675
                                    "admin_state", "admin_ram",
2676
                                    "disk_template", "ip", "mac", "bridge",
2677
                                    "sda_size", "sdb_size", "vcpus", "tags",
2678
                                    "network_port", "beparams",
2679
                                    "(disk).(size)/([0-9]+)",
2680
                                    "(disk).(sizes)",
2681
                                    "(nic).(mac|ip|bridge)/([0-9]+)",
2682
                                    "(nic).(macs|ips|bridges)",
2683
                                    "(disk|nic).(count)",
2684
                                    "serial_no", "hypervisor", "hvparams",] +
2685
                                  ["hv/%s" % name
2686
                                   for name in constants.HVS_PARAMETERS] +
2687
                                  ["be/%s" % name
2688
                                   for name in constants.BES_PARAMETERS])
2689
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2690

    
2691

    
2692
  def ExpandNames(self):
2693
    _CheckOutputFields(static=self._FIELDS_STATIC,
2694
                       dynamic=self._FIELDS_DYNAMIC,
2695
                       selected=self.op.output_fields)
2696

    
2697
    self.needed_locks = {}
2698
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2699
    self.share_locks[locking.LEVEL_NODE] = 1
2700

    
2701
    if self.op.names:
2702
      self.wanted = _GetWantedInstances(self, self.op.names)
2703
    else:
2704
      self.wanted = locking.ALL_SET
2705

    
2706
    self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2707
    if self.do_locking:
2708
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2709
      self.needed_locks[locking.LEVEL_NODE] = []
2710
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2711

    
2712
  def DeclareLocks(self, level):
2713
    if level == locking.LEVEL_NODE and self.do_locking:
2714
      self._LockInstancesNodes()
2715

    
2716
  def CheckPrereq(self):
2717
    """Check prerequisites.
2718

2719
    """
2720
    pass
2721

    
2722
  def Exec(self, feedback_fn):
2723
    """Computes the list of nodes and their attributes.
2724

2725
    """
2726
    all_info = self.cfg.GetAllInstancesInfo()
2727
    if self.do_locking:
2728
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2729
    elif self.wanted != locking.ALL_SET:
2730
      instance_names = self.wanted
2731
      missing = set(instance_names).difference(all_info.keys())
2732
      if missing:
2733
        raise errors.OpExecError(
2734
          "Some instances were removed before retrieving their data: %s"
2735
          % missing)
2736
    else:
2737
      instance_names = all_info.keys()
2738

    
2739
    instance_names = utils.NiceSort(instance_names)
2740
    instance_list = [all_info[iname] for iname in instance_names]
2741

    
2742
    # begin data gathering
2743

    
2744
    nodes = frozenset([inst.primary_node for inst in instance_list])
2745
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2746

    
2747
    bad_nodes = []
2748
    if self.do_locking:
2749
      live_data = {}
2750
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2751
      for name in nodes:
2752
        result = node_data[name]
2753
        if result:
2754
          live_data.update(result)
2755
        elif result == False:
2756
          bad_nodes.append(name)
2757
        # else no instance is alive
2758
    else:
2759
      live_data = dict([(name, {}) for name in instance_names])
2760

    
2761
    # end data gathering
2762

    
2763
    HVPREFIX = "hv/"
2764
    BEPREFIX = "be/"
2765
    output = []
2766
    for instance in instance_list:
2767
      iout = []
2768
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2769
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2770
      for field in self.op.output_fields:
2771
        st_match = self._FIELDS_STATIC.Matches(field)
2772
        if field == "name":
2773
          val = instance.name
2774
        elif field == "os":
2775
          val = instance.os
2776
        elif field == "pnode":
2777
          val = instance.primary_node
2778
        elif field == "snodes":
2779
          val = list(instance.secondary_nodes)
2780
        elif field == "admin_state":
2781
          val = (instance.status != "down")
2782
        elif field == "oper_state":
2783
          if instance.primary_node in bad_nodes:
2784
            val = None
2785
          else:
2786
            val = bool(live_data.get(instance.name))
2787
        elif field == "status":
2788
          if instance.primary_node in bad_nodes:
2789
            val = "ERROR_nodedown"
2790
          else:
2791
            running = bool(live_data.get(instance.name))
2792
            if running:
2793
              if instance.status != "down":
2794
                val = "running"
2795
              else:
2796
                val = "ERROR_up"
2797
            else:
2798
              if instance.status != "down":
2799
                val = "ERROR_down"
2800
              else:
2801
                val = "ADMIN_down"
2802
        elif field == "oper_ram":
2803
          if instance.primary_node in bad_nodes:
2804
            val = None
2805
          elif instance.name in live_data:
2806
            val = live_data[instance.name].get("memory", "?")
2807
          else:
2808
            val = "-"
2809
        elif field == "disk_template":
2810
          val = instance.disk_template
2811
        elif field == "ip":
2812
          val = instance.nics[0].ip
2813
        elif field == "bridge":
2814
          val = instance.nics[0].bridge
2815
        elif field == "mac":
2816
          val = instance.nics[0].mac
2817
        elif field == "sda_size" or field == "sdb_size":
2818
          idx = ord(field[2]) - ord('a')
2819
          try:
2820
            val = instance.FindDisk(idx).size
2821
          except errors.OpPrereqError:
2822
            val = None
2823
        elif field == "tags":
2824
          val = list(instance.GetTags())
2825
        elif field == "serial_no":
2826
          val = instance.serial_no
2827
        elif field == "network_port":
2828
          val = instance.network_port
2829
        elif field == "hypervisor":
2830
          val = instance.hypervisor
2831
        elif field == "hvparams":
2832
          val = i_hv
2833
        elif (field.startswith(HVPREFIX) and
2834
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2835
          val = i_hv.get(field[len(HVPREFIX):], None)
2836
        elif field == "beparams":
2837
          val = i_be
2838
        elif (field.startswith(BEPREFIX) and
2839
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2840
          val = i_be.get(field[len(BEPREFIX):], None)
2841
        elif st_match and st_match.groups():
2842
          # matches a variable list
2843
          st_groups = st_match.groups()
2844
          if st_groups and st_groups[0] == "disk":
2845
            if st_groups[1] == "count":
2846
              val = len(instance.disks)
2847
            elif st_groups[1] == "sizes":
2848
              val = [disk.size for disk in instance.disks]
2849
            elif st_groups[1] == "size":
2850
              try:
2851
                val = instance.FindDisk(st_groups[2]).size
2852
              except errors.OpPrereqError:
2853
                val = None
2854
            else:
2855
              assert False, "Unhandled disk parameter"
2856
          elif st_groups[0] == "nic":
2857
            if st_groups[1] == "count":
2858
              val = len(instance.nics)
2859
            elif st_groups[1] == "macs":
2860
              val = [nic.mac for nic in instance.nics]
2861
            elif st_groups[1] == "ips":
2862
              val = [nic.ip for nic in instance.nics]
2863
            elif st_groups[1] == "bridges":
2864
              val = [nic.bridge for nic in instance.nics]
2865
            else:
2866
              # index-based item
2867
              nic_idx = int(st_groups[2])
2868
              if nic_idx >= len(instance.nics):
2869
                val = None
2870
              else:
2871
                if st_groups[1] == "mac":
2872
                  val = instance.nics[nic_idx].mac
2873
                elif st_groups[1] == "ip":
2874
                  val = instance.nics[nic_idx].ip
2875
                elif st_groups[1] == "bridge":
2876
                  val = instance.nics[nic_idx].bridge
2877
                else:
2878
                  assert False, "Unhandled NIC parameter"
2879
          else:
2880
            assert False, "Unhandled variable parameter"
2881
        else:
2882
          raise errors.ParameterError(field)
2883
        iout.append(val)
2884
      output.append(iout)
2885

    
2886
    return output
2887

    
2888

    
2889
class LUFailoverInstance(LogicalUnit):
2890
  """Failover an instance.
2891

2892
  """
2893
  HPATH = "instance-failover"
2894
  HTYPE = constants.HTYPE_INSTANCE
2895
  _OP_REQP = ["instance_name", "ignore_consistency"]
2896
  REQ_BGL = False
2897

    
2898
  def ExpandNames(self):
2899
    self._ExpandAndLockInstance()
2900
    self.needed_locks[locking.LEVEL_NODE] = []
2901
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2902

    
2903
  def DeclareLocks(self, level):
2904
    if level == locking.LEVEL_NODE:
2905
      self._LockInstancesNodes()
2906

    
2907
  def BuildHooksEnv(self):
2908
    """Build hooks env.
2909

2910
    This runs on master, primary and secondary nodes of the instance.
2911

2912
    """
2913
    env = {
2914
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2915
      }
2916
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2917
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2918
    return env, nl, nl
2919

    
2920
  def CheckPrereq(self):
2921
    """Check prerequisites.
2922

2923
    This checks that the instance is in the cluster.
2924

2925
    """
2926
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2927
    assert self.instance is not None, \
2928
      "Cannot retrieve locked instance %s" % self.op.instance_name
2929

    
2930
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2931
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2932
      raise errors.OpPrereqError("Instance's disk layout is not"
2933
                                 " network mirrored, cannot failover.")
2934

    
2935
    secondary_nodes = instance.secondary_nodes
2936
    if not secondary_nodes:
2937
      raise errors.ProgrammerError("no secondary node but using "
2938
                                   "a mirrored disk template")
2939

    
2940
    target_node = secondary_nodes[0]
2941
    # check memory requirements on the secondary node
2942
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2943
                         instance.name, bep[constants.BE_MEMORY],
2944
                         instance.hypervisor)
2945

    
2946
    # check bridge existance
2947
    brlist = [nic.bridge for nic in instance.nics]
2948
    if not self.rpc.call_bridges_exist(target_node, brlist):
2949
      raise errors.OpPrereqError("One or more target bridges %s does not"
2950
                                 " exist on destination node '%s'" %
2951
                                 (brlist, target_node))
2952

    
2953
  def Exec(self, feedback_fn):
2954
    """Failover an instance.
2955

2956
    The failover is done by shutting it down on its present node and
2957
    starting it on the secondary.
2958

2959
    """
2960
    instance = self.instance
2961

    
2962
    source_node = instance.primary_node
2963
    target_node = instance.secondary_nodes[0]
2964

    
2965
    feedback_fn("* checking disk consistency between source and target")
2966
    for dev in instance.disks:
2967
      # for drbd, these are drbd over lvm
2968
      if not _CheckDiskConsistency(self, dev, target_node, False):
2969
        if instance.status == "up" and not self.op.ignore_consistency:
2970
          raise errors.OpExecError("Disk %s is degraded on target node,"
2971
                                   " aborting failover." % dev.iv_name)
2972

    
2973
    feedback_fn("* shutting down instance on source node")
2974
    logging.info("Shutting down instance %s on node %s",
2975
                 instance.name, source_node)
2976

    
2977
    if not self.rpc.call_instance_shutdown(source_node, instance):
2978
      if self.op.ignore_consistency:
2979
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
2980
                             " Proceeding"
2981
                             " anyway. Please make sure node %s is down",
2982
                             instance.name, source_node, source_node)
2983
      else:
2984
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2985
                                 (instance.name, source_node))
2986

    
2987
    feedback_fn("* deactivating the instance's disks on source node")
2988
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2989
      raise errors.OpExecError("Can't shut down the instance's disks.")
2990

    
2991
    instance.primary_node = target_node
2992
    # distribute new instance config to the other nodes
2993
    self.cfg.Update(instance)
2994

    
2995
    # Only start the instance if it's marked as up
2996
    if instance.status == "up":
2997
      feedback_fn("* activating the instance's disks on target node")
2998
      logging.info("Starting instance %s on node %s",
2999
                   instance.name, target_node)
3000

    
3001
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3002
                                               ignore_secondaries=True)
3003
      if not disks_ok:
3004
        _ShutdownInstanceDisks(self, instance)
3005
        raise errors.OpExecError("Can't activate the instance's disks")
3006

    
3007
      feedback_fn("* starting the instance on the target node")
3008
      if not self.rpc.call_instance_start(target_node, instance, None):
3009
        _ShutdownInstanceDisks(self, instance)
3010
        raise errors.OpExecError("Could not start instance %s on node %s." %
3011
                                 (instance.name, target_node))
3012

    
3013

    
3014
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3015
  """Create a tree of block devices on the primary node.
3016

3017
  This always creates all devices.
3018

3019
  """
3020
  if device.children:
3021
    for child in device.children:
3022
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3023
        return False
3024

    
3025
  lu.cfg.SetDiskID(device, node)
3026
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3027
                                       instance.name, True, info)
3028
  if not new_id:
3029
    return False
3030
  if device.physical_id is None:
3031
    device.physical_id = new_id
3032
  return True
3033

    
3034

    
3035
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3036
  """Create a tree of block devices on a secondary node.
3037

3038
  If this device type has to be created on secondaries, create it and
3039
  all its children.
3040

3041
  If not, just recurse to children keeping the same 'force' value.
3042

3043
  """
3044
  if device.CreateOnSecondary():
3045
    force = True
3046
  if device.children:
3047
    for child in device.children:
3048
      if not _CreateBlockDevOnSecondary(lu, node, instance,
3049
                                        child, force, info):
3050
        return False
3051

    
3052
  if not force:
3053
    return True
3054
  lu.cfg.SetDiskID(device, node)
3055
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3056
                                       instance.name, False, info)
3057
  if not new_id:
3058
    return False
3059
  if device.physical_id is None:
3060
    device.physical_id = new_id
3061
  return True
3062

    
3063

    
3064
def _GenerateUniqueNames(lu, exts):
3065
  """Generate a suitable LV name.
3066

3067
  This will generate a logical volume name for the given instance.
3068

3069
  """
3070
  results = []
3071
  for val in exts:
3072
    new_id = lu.cfg.GenerateUniqueID()
3073
    results.append("%s%s" % (new_id, val))
3074
  return results
3075

    
3076

    
3077
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3078
                         p_minor, s_minor):
3079
  """Generate a drbd8 device complete with its children.
3080

3081
  """
3082
  port = lu.cfg.AllocatePort()
3083
  vgname = lu.cfg.GetVGName()
3084
  shared_secret = lu.cfg.GenerateDRBDSecret()
3085
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3086
                          logical_id=(vgname, names[0]))
3087
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3088
                          logical_id=(vgname, names[1]))
3089
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3090
                          logical_id=(primary, secondary, port,
3091
                                      p_minor, s_minor,
3092
                                      shared_secret),
3093
                          children=[dev_data, dev_meta],
3094
                          iv_name=iv_name)
3095
  return drbd_dev
3096

    
3097

    
3098
def _GenerateDiskTemplate(lu, template_name,
3099
                          instance_name, primary_node,
3100
                          secondary_nodes, disk_info,
3101
                          file_storage_dir, file_driver):
3102
  """Generate the entire disk layout for a given template type.
3103

3104
  """
3105
  #TODO: compute space requirements
3106

    
3107
  vgname = lu.cfg.GetVGName()
3108
  disk_count = len(disk_info)
3109
  disks = []
3110
  if template_name == constants.DT_DISKLESS:
3111
    pass
3112
  elif template_name == constants.DT_PLAIN:
3113
    if len(secondary_nodes) != 0:
3114
      raise errors.ProgrammerError("Wrong template configuration")
3115

    
3116
    names = _GenerateUniqueNames(lu, [".disk%d" % i
3117
                                      for i in range(disk_count)])
3118
    for idx, disk in enumerate(disk_info):
3119
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3120
                              logical_id=(vgname, names[idx]),
3121
                              iv_name = "disk/%d" % idx)
3122
      disks.append(disk_dev)
3123
  elif template_name == constants.DT_DRBD8:
3124
    if len(secondary_nodes) != 1:
3125
      raise errors.ProgrammerError("Wrong template configuration")
3126
    remote_node = secondary_nodes[0]
3127
    minors = lu.cfg.AllocateDRBDMinor(
3128
      [primary_node, remote_node] * len(disk_info), instance_name)
3129

    
3130
    names = _GenerateUniqueNames(lu,
3131
                                 [".disk%d_%s" % (i, s)
3132
                                  for i in range(disk_count)
3133
                                  for s in ("data", "meta")
3134
                                  ])
3135
    for idx, disk in enumerate(disk_info):
3136
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3137
                                      disk["size"], names[idx*2:idx*2+2],
3138
                                      "disk/%d" % idx,
3139
                                      minors[idx*2], minors[idx*2+1])
3140
      disks.append(disk_dev)
3141
  elif template_name == constants.DT_FILE:
3142
    if len(secondary_nodes) != 0:
3143
      raise errors.ProgrammerError("Wrong template configuration")
3144

    
3145
    for idx, disk in enumerate(disk_info):
3146

    
3147
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3148
                              iv_name="disk/%d" % idx,
3149
                              logical_id=(file_driver,
3150
                                          "%s/disk%d" % (file_storage_dir,
3151
                                                         idx)))
3152
      disks.append(disk_dev)
3153
  else:
3154
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3155
  return disks
3156

    
3157

    
3158
def _GetInstanceInfoText(instance):
3159
  """Compute that text that should be added to the disk's metadata.
3160

3161
  """
3162
  return "originstname+%s" % instance.name
3163

    
3164

    
3165
def _CreateDisks(lu, instance):
3166
  """Create all disks for an instance.
3167

3168
  This abstracts away some work from AddInstance.
3169

3170
  Args:
3171
    instance: the instance object
3172

3173
  Returns:
3174
    True or False showing the success of the creation process
3175

3176
  """
3177
  info = _GetInstanceInfoText(instance)
3178

    
3179
  if instance.disk_template == constants.DT_FILE:
3180
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3181
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3182
                                                 file_storage_dir)
3183

    
3184
    if not result:
3185
      logging.error("Could not connect to node '%s'", instance.primary_node)
3186
      return False
3187

    
3188
    if not result[0]:
3189
      logging.error("Failed to create directory '%s'", file_storage_dir)
3190
      return False
3191

    
3192
  for device in instance.disks:
3193
    logging.info("Creating volume %s for instance %s",
3194
                 device.iv_name, instance.name)
3195
    #HARDCODE
3196
    for secondary_node in instance.secondary_nodes:
3197
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3198
                                        device, False, info):
3199
        logging.error("Failed to create volume %s (%s) on secondary node %s!",
3200
                      device.iv_name, device, secondary_node)
3201
        return False
3202
    #HARDCODE
3203
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3204
                                    instance, device, info):
3205
      logging.error("Failed to create volume %s on primary!", device.iv_name)
3206
      return False
3207

    
3208
  return True
3209

    
3210

    
3211
def _RemoveDisks(lu, instance):
3212
  """Remove all disks for an instance.
3213

3214
  This abstracts away some work from `AddInstance()` and
3215
  `RemoveInstance()`. Note that in case some of the devices couldn't
3216
  be removed, the removal will continue with the other ones (compare
3217
  with `_CreateDisks()`).
3218

3219
  Args:
3220
    instance: the instance object
3221

3222
  Returns:
3223
    True or False showing the success of the removal proces
3224

3225
  """
3226
  logging.info("Removing block devices for instance %s", instance.name)
3227

    
3228
  result = True
3229
  for device in instance.disks:
3230
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3231
      lu.cfg.SetDiskID(disk, node)
3232
      if not lu.rpc.call_blockdev_remove(node, disk):
3233
        lu.proc.LogWarning("Could not remove block device %s on node %s,"
3234
                           " continuing anyway", device.iv_name, node)
3235
        result = False
3236

    
3237
  if instance.disk_template == constants.DT_FILE:
3238
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3239
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3240
                                               file_storage_dir):
3241
      logging.error("Could not remove directory '%s'", file_storage_dir)
3242
      result = False
3243

    
3244
  return result
3245

    
3246

    
3247
def _ComputeDiskSize(disk_template, disks):
3248
  """Compute disk size requirements in the volume group
3249

3250
  This is currently hard-coded for the two-drive layout.
3251

3252
  """
3253
  # Required free disk space as a function of disk and swap space
3254
  req_size_dict = {
3255
    constants.DT_DISKLESS: None,
3256
    constants.DT_PLAIN: sum(d["size"] for d in disks),
3257
    # 128 MB are added for drbd metadata for each disk
3258
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3259
    constants.DT_FILE: None,
3260
  }
3261

    
3262
  if disk_template not in req_size_dict:
3263
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3264
                                 " is unknown" %  disk_template)
3265

    
3266
  return req_size_dict[disk_template]
3267

    
3268

    
3269
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3270
  """Hypervisor parameter validation.
3271

3272
  This function abstract the hypervisor parameter validation to be
3273
  used in both instance create and instance modify.
3274

3275
  @type lu: L{LogicalUnit}
3276
  @param lu: the logical unit for which we check
3277
  @type nodenames: list
3278
  @param nodenames: the list of nodes on which we should check
3279
  @type hvname: string
3280
  @param hvname: the name of the hypervisor we should use
3281
  @type hvparams: dict
3282
  @param hvparams: the parameters which we need to check
3283
  @raise errors.OpPrereqError: if the parameters are not valid
3284

3285
  """
3286
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3287
                                                  hvname,
3288
                                                  hvparams)
3289
  for node in nodenames:
3290
    info = hvinfo.get(node, None)
3291
    if not info or not isinstance(info, (tuple, list)):
3292
      raise errors.OpPrereqError("Cannot get current information"
3293
                                 " from node '%s' (%s)" % (node, info))
3294
    if not info[0]:
3295
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3296
                                 " %s" % info[1])
3297

    
3298

    
3299
class LUCreateInstance(LogicalUnit):
3300
  """Create an instance.
3301

3302
  """
3303
  HPATH = "instance-add"
3304
  HTYPE = constants.HTYPE_INSTANCE
3305
  _OP_REQP = ["instance_name", "disks", "disk_template",
3306
              "mode", "start",
3307
              "wait_for_sync", "ip_check", "nics",
3308
              "hvparams", "beparams"]
3309
  REQ_BGL = False
3310

    
3311
  def _ExpandNode(self, node):
3312
    """Expands and checks one node name.
3313

3314
    """
3315
    node_full = self.cfg.ExpandNodeName(node)
3316
    if node_full is None:
3317
      raise errors.OpPrereqError("Unknown node %s" % node)
3318
    return node_full
3319

    
3320
  def ExpandNames(self):
3321
    """ExpandNames for CreateInstance.
3322

3323
    Figure out the right locks for instance creation.
3324

3325
    """
3326
    self.needed_locks = {}
3327

    
3328
    # set optional parameters to none if they don't exist
3329
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3330
      if not hasattr(self.op, attr):
3331
        setattr(self.op, attr, None)
3332

    
3333
    # cheap checks, mostly valid constants given
3334

    
3335
    # verify creation mode
3336
    if self.op.mode not in (constants.INSTANCE_CREATE,
3337
                            constants.INSTANCE_IMPORT):
3338
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3339
                                 self.op.mode)
3340

    
3341
    # disk template and mirror node verification
3342
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3343
      raise errors.OpPrereqError("Invalid disk template name")
3344

    
3345
    if self.op.hypervisor is None:
3346
      self.op.hypervisor = self.cfg.GetHypervisorType()
3347

    
3348
    cluster = self.cfg.GetClusterInfo()
3349
    enabled_hvs = cluster.enabled_hypervisors
3350
    if self.op.hypervisor not in enabled_hvs:
3351
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3352
                                 " cluster (%s)" % (self.op.hypervisor,
3353
                                  ",".join(enabled_hvs)))
3354

    
3355
    # check hypervisor parameter syntax (locally)
3356

    
3357
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3358
                                  self.op.hvparams)
3359
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3360
    hv_type.CheckParameterSyntax(filled_hvp)
3361

    
3362
    # fill and remember the beparams dict
3363
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3364
                                    self.op.beparams)
3365

    
3366
    #### instance parameters check
3367

    
3368
    # instance name verification
3369
    hostname1 = utils.HostInfo(self.op.instance_name)
3370
    self.op.instance_name = instance_name = hostname1.name
3371

    
3372
    # this is just a preventive check, but someone might still add this
3373
    # instance in the meantime, and creation will fail at lock-add time
3374
    if instance_name in self.cfg.GetInstanceList():
3375
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3376
                                 instance_name)
3377

    
3378
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3379

    
3380
    # NIC buildup
3381
    self.nics = []
3382
    for nic in self.op.nics:
3383
      # ip validity checks
3384
      ip = nic.get("ip", None)
3385
      if ip is None or ip.lower() == "none":
3386
        nic_ip = None
3387
      elif ip.lower() == constants.VALUE_AUTO:
3388
        nic_ip = hostname1.ip
3389
      else:
3390
        if not utils.IsValidIP(ip):
3391
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3392
                                     " like a valid IP" % ip)
3393
        nic_ip = ip
3394

    
3395
      # MAC address verification
3396
      mac = nic.get("mac", constants.VALUE_AUTO)
3397
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3398
        if not utils.IsValidMac(mac.lower()):
3399
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3400
                                     mac)
3401
      # bridge verification
3402
      bridge = nic.get("bridge", self.cfg.GetDefBridge())
3403
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3404

    
3405
    # disk checks/pre-build
3406
    self.disks = []
3407
    for disk in self.op.disks:
3408
      mode = disk.get("mode", constants.DISK_RDWR)
3409
      if mode not in constants.DISK_ACCESS_SET:
3410
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3411
                                   mode)
3412
      size = disk.get("size", None)
3413
      if size is None:
3414
        raise errors.OpPrereqError("Missing disk size")
3415
      try:
3416
        size = int(size)
3417
      except ValueError:
3418
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3419
      self.disks.append({"size": size, "mode": mode})
3420

    
3421
    # used in CheckPrereq for ip ping check
3422
    self.check_ip = hostname1.ip
3423

    
3424
    # file storage checks
3425
    if (self.op.file_driver and
3426
        not self.op.file_driver in constants.FILE_DRIVER):
3427
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3428
                                 self.op.file_driver)
3429

    
3430
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3431
      raise errors.OpPrereqError("File storage directory path not absolute")
3432

    
3433
    ### Node/iallocator related checks
3434
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3435
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3436
                                 " node must be given")
3437

    
3438
    if self.op.iallocator:
3439
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3440
    else:
3441
      self.op.pnode = self._ExpandNode(self.op.pnode)
3442
      nodelist = [self.op.pnode]
3443
      if self.op.snode is not None:
3444
        self.op.snode = self._ExpandNode(self.op.snode)
3445
        nodelist.append(self.op.snode)
3446
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3447

    
3448
    # in case of import lock the source node too
3449
    if self.op.mode == constants.INSTANCE_IMPORT:
3450
      src_node = getattr(self.op, "src_node", None)
3451
      src_path = getattr(self.op, "src_path", None)
3452

    
3453
      if src_node is None or src_path is None:
3454
        raise errors.OpPrereqError("Importing an instance requires source"
3455
                                   " node and path options")
3456

    
3457
      if not os.path.isabs(src_path):
3458
        raise errors.OpPrereqError("The source path must be absolute")
3459

    
3460
      self.op.src_node = src_node = self._ExpandNode(src_node)
3461
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3462
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3463

    
3464
    else: # INSTANCE_CREATE
3465
      if getattr(self.op, "os_type", None) is None:
3466
        raise errors.OpPrereqError("No guest OS specified")
3467

    
3468
  def _RunAllocator(self):
3469
    """Run the allocator based on input opcode.
3470

3471
    """
3472
    nics = [n.ToDict() for n in self.nics]
3473
    ial = IAllocator(self,
3474
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3475
                     name=self.op.instance_name,
3476
                     disk_template=self.op.disk_template,
3477
                     tags=[],
3478
                     os=self.op.os_type,
3479
                     vcpus=self.be_full[constants.BE_VCPUS],
3480
                     mem_size=self.be_full[constants.BE_MEMORY],
3481
                     disks=self.disks,
3482
                     nics=nics,
3483
                     )
3484

    
3485
    ial.Run(self.op.iallocator)
3486

    
3487
    if not ial.success:
3488
      raise errors.OpPrereqError("Can't compute nodes using"
3489
                                 " iallocator '%s': %s" % (self.op.iallocator,
3490
                                                           ial.info))
3491
    if len(ial.nodes) != ial.required_nodes:
3492
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3493
                                 " of nodes (%s), required %s" %
3494
                                 (self.op.iallocator, len(ial.nodes),
3495
                                  ial.required_nodes))
3496
    self.op.pnode = ial.nodes[0]
3497
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3498
                 self.op.instance_name, self.op.iallocator,
3499
                 ", ".join(ial.nodes))
3500
    if ial.required_nodes == 2:
3501
      self.op.snode = ial.nodes[1]
3502

    
3503
  def BuildHooksEnv(self):
3504
    """Build hooks env.
3505

3506
    This runs on master, primary and secondary nodes of the instance.
3507

3508
    """
3509
    env = {
3510
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3511
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3512
      "INSTANCE_ADD_MODE": self.op.mode,
3513
      }
3514
    if self.op.mode == constants.INSTANCE_IMPORT:
3515
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3516
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3517
      env["INSTANCE_SRC_IMAGES"] = self.src_images
3518

    
3519
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3520
      primary_node=self.op.pnode,
3521
      secondary_nodes=self.secondaries,
3522
      status=self.instance_status,
3523
      os_type=self.op.os_type,
3524
      memory=self.be_full[constants.BE_MEMORY],
3525
      vcpus=self.be_full[constants.BE_VCPUS],
3526
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3527
    ))
3528

    
3529
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3530
          self.secondaries)
3531
    return env, nl, nl
3532

    
3533

    
3534
  def CheckPrereq(self):
3535
    """Check prerequisites.
3536

3537
    """
3538
    if (not self.cfg.GetVGName() and
3539
        self.op.disk_template not in constants.DTS_NOT_LVM):
3540
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3541
                                 " instances")
3542

    
3543

    
3544
    if self.op.mode == constants.INSTANCE_IMPORT:
3545
      src_node = self.op.src_node
3546
      src_path = self.op.src_path
3547

    
3548
      export_info = self.rpc.call_export_info(src_node, src_path)
3549

    
3550
      if not export_info:
3551
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3552

    
3553
      if not export_info.has_section(constants.INISECT_EXP):
3554
        raise errors.ProgrammerError("Corrupted export config")
3555

    
3556
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3557
      if (int(ei_version) != constants.EXPORT_VERSION):
3558
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3559
                                   (ei_version, constants.EXPORT_VERSION))
3560

    
3561
      # Check that the new instance doesn't have less disks than the export
3562
      instance_disks = len(self.disks)
3563
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3564
      if instance_disks < export_disks:
3565
        raise errors.OpPrereqError("Not enough disks to import."
3566
                                   " (instance: %d, export: %d)" %
3567
                                   (2, export_disks))
3568

    
3569
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3570
      disk_images = []
3571
      for idx in range(export_disks):
3572
        option = 'disk%d_dump' % idx
3573
        if export_info.has_option(constants.INISECT_INS, option):
3574
          # FIXME: are the old os-es, disk sizes, etc. useful?
3575
          export_name = export_info.get(constants.INISECT_INS, option)
3576
          image = os.path.join(src_path, export_name)
3577
          disk_images.append(image)
3578
        else:
3579
          disk_images.append(False)
3580

    
3581
      self.src_images = disk_images
3582

    
3583
      if self.op.mac == constants.VALUE_AUTO:
3584
        old_name = export_info.get(constants.INISECT_INS, 'name')
3585
        if self.op.instance_name == old_name:
3586
          # FIXME: adjust every nic, when we'll be able to create instances
3587
          # with more than one
3588
          if int(export_info.get(constants.INISECT_INS, 'nic_count')) >= 1:
3589
            self.op.mac = export_info.get(constants.INISECT_INS, 'nic_0_mac')
3590

    
3591
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3592

    
3593
    if self.op.start and not self.op.ip_check:
3594
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3595
                                 " adding an instance in start mode")
3596

    
3597
    if self.op.ip_check:
3598
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3599
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3600
                                   (self.check_ip, self.op.instance_name))
3601

    
3602
    #### allocator run
3603

    
3604
    if self.op.iallocator is not None:
3605
      self._RunAllocator()
3606

    
3607
    #### node related checks
3608

    
3609
    # check primary node
3610
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3611
    assert self.pnode is not None, \
3612
      "Cannot retrieve locked node %s" % self.op.pnode
3613
    self.secondaries = []
3614

    
3615
    # mirror node verification
3616
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3617
      if self.op.snode is None:
3618
        raise errors.OpPrereqError("The networked disk templates need"
3619
                                   " a mirror node")
3620
      if self.op.snode == pnode.name:
3621
        raise errors.OpPrereqError("The secondary node cannot be"
3622
                                   " the primary node.")
3623
      self.secondaries.append(self.op.snode)
3624

    
3625
    nodenames = [pnode.name] + self.secondaries
3626

    
3627
    req_size = _ComputeDiskSize(self.op.disk_template,
3628
                                self.disks)
3629

    
3630
    # Check lv size requirements
3631
    if req_size is not None:
3632
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3633
                                         self.op.hypervisor)
3634
      for node in nodenames:
3635
        info = nodeinfo.get(node, None)
3636
        if not info:
3637
          raise errors.OpPrereqError("Cannot get current information"
3638
                                     " from node '%s'" % node)
3639
        vg_free = info.get('vg_free', None)
3640
        if not isinstance(vg_free, int):
3641
          raise errors.OpPrereqError("Can't compute free disk space on"
3642
                                     " node %s" % node)
3643
        if req_size > info['vg_free']:
3644
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3645
                                     " %d MB available, %d MB required" %
3646
                                     (node, info['vg_free'], req_size))
3647

    
3648
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3649

    
3650
    # os verification
3651
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3652
    if not os_obj:
3653
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3654
                                 " primary node"  % self.op.os_type)
3655

    
3656
    # bridge check on primary node
3657
    bridges = [n.bridge for n in self.nics]
3658
    if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3659
      raise errors.OpPrereqError("one of the target bridges '%s' does not"
3660
                                 " exist on"
3661
                                 " destination node '%s'" %
3662
                                 (",".join(bridges), pnode.name))
3663

    
3664
    # memory check on primary node
3665
    if self.op.start:
3666
      _CheckNodeFreeMemory(self, self.pnode.name,
3667
                           "creating instance %s" % self.op.instance_name,
3668
                           self.be_full[constants.BE_MEMORY],
3669
                           self.op.hypervisor)
3670

    
3671
    if self.op.start:
3672
      self.instance_status = 'up'
3673
    else:
3674
      self.instance_status = 'down'
3675

    
3676
  def Exec(self, feedback_fn):
3677
    """Create and add the instance to the cluster.
3678

3679
    """
3680
    instance = self.op.instance_name
3681
    pnode_name = self.pnode.name
3682

    
3683
    for nic in self.nics:
3684
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3685
        nic.mac = self.cfg.GenerateMAC()
3686

    
3687
    ht_kind = self.op.hypervisor
3688
    if ht_kind in constants.HTS_REQ_PORT:
3689
      network_port = self.cfg.AllocatePort()
3690
    else:
3691
      network_port = None
3692

    
3693
    ##if self.op.vnc_bind_address is None:
3694
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3695

    
3696
    # this is needed because os.path.join does not accept None arguments
3697
    if self.op.file_storage_dir is None:
3698
      string_file_storage_dir = ""
3699
    else:
3700
      string_file_storage_dir = self.op.file_storage_dir
3701

    
3702
    # build the full file storage dir path
3703
    file_storage_dir = os.path.normpath(os.path.join(
3704
                                        self.cfg.GetFileStorageDir(),
3705
                                        string_file_storage_dir, instance))
3706

    
3707

    
3708
    disks = _GenerateDiskTemplate(self,
3709
                                  self.op.disk_template,
3710
                                  instance, pnode_name,
3711
                                  self.secondaries,
3712
                                  self.disks,
3713
                                  file_storage_dir,
3714
                                  self.op.file_driver)
3715

    
3716
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3717
                            primary_node=pnode_name,
3718
                            nics=self.nics, disks=disks,
3719
                            disk_template=self.op.disk_template,
3720
                            status=self.instance_status,
3721
                            network_port=network_port,
3722
                            beparams=self.op.beparams,
3723
                            hvparams=self.op.hvparams,
3724
                            hypervisor=self.op.hypervisor,
3725
                            )
3726

    
3727
    feedback_fn("* creating instance disks...")
3728
    if not _CreateDisks(self, iobj):
3729
      _RemoveDisks(self, iobj)
3730
      self.cfg.ReleaseDRBDMinors(instance)
3731
      raise errors.OpExecError("Device creation failed, reverting...")
3732

    
3733
    feedback_fn("adding instance %s to cluster config" % instance)
3734

    
3735
    self.cfg.AddInstance(iobj)
3736
    # Declare that we don't want to remove the instance lock anymore, as we've
3737
    # added the instance to the config
3738
    del self.remove_locks[locking.LEVEL_INSTANCE]
3739
    # Remove the temp. assignements for the instance's drbds
3740
    self.cfg.ReleaseDRBDMinors(instance)
3741

    
3742
    if self.op.wait_for_sync:
3743
      disk_abort = not _WaitForSync(self, iobj)
3744
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3745
      # make sure the disks are not degraded (still sync-ing is ok)
3746
      time.sleep(15)
3747
      feedback_fn("* checking mirrors status")
3748
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3749
    else:
3750
      disk_abort = False
3751

    
3752
    if disk_abort:
3753
      _RemoveDisks(self, iobj)
3754
      self.cfg.RemoveInstance(iobj.name)
3755
      # Make sure the instance lock gets removed
3756
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3757
      raise errors.OpExecError("There are some degraded disks for"
3758
                               " this instance")
3759

    
3760
    feedback_fn("creating os for instance %s on node %s" %
3761
                (instance, pnode_name))
3762

    
3763
    if iobj.disk_template != constants.DT_DISKLESS:
3764
      if self.op.mode == constants.INSTANCE_CREATE:
3765
        feedback_fn("* running the instance OS create scripts...")
3766
        if not self.rpc.call_instance_os_add(pnode_name, iobj):
3767
          raise errors.OpExecError("could not add os for instance %s"
3768
                                   " on node %s" %
3769
                                   (instance, pnode_name))
3770

    
3771
      elif self.op.mode == constants.INSTANCE_IMPORT:
3772
        feedback_fn("* running the instance OS import scripts...")
3773
        src_node = self.op.src_node
3774
        src_images = self.src_images
3775
        cluster_name = self.cfg.GetClusterName()
3776
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3777
                                                         src_node, src_images,
3778
                                                         cluster_name)
3779
        for idx, result in enumerate(import_result):
3780
          if not result:
3781
            self.LogWarning("Could not image %s for on instance %s, disk %d,"
3782
                            " on node %s" % (src_images[idx], instance, idx,
3783
                                             pnode_name))
3784
      else:
3785
        # also checked in the prereq part
3786
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3787
                                     % self.op.mode)
3788

    
3789
    if self.op.start:
3790
      logging.info("Starting instance %s on node %s", instance, pnode_name)
3791
      feedback_fn("* starting instance...")
3792
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3793
        raise errors.OpExecError("Could not start instance")
3794

    
3795

    
3796
class LUConnectConsole(NoHooksLU):
3797
  """Connect to an instance's console.
3798

3799
  This is somewhat special in that it returns the command line that
3800
  you need to run on the master node in order to connect to the
3801
  console.
3802

3803
  """
3804
  _OP_REQP = ["instance_name"]
3805
  REQ_BGL = False
3806

    
3807
  def ExpandNames(self):
3808
    self._ExpandAndLockInstance()
3809

    
3810
  def CheckPrereq(self):
3811
    """Check prerequisites.
3812

3813
    This checks that the instance is in the cluster.
3814

3815
    """
3816
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3817
    assert self.instance is not None, \
3818
      "Cannot retrieve locked instance %s" % self.op.instance_name
3819

    
3820
  def Exec(self, feedback_fn):
3821
    """Connect to the console of an instance
3822

3823
    """
3824
    instance = self.instance
3825
    node = instance.primary_node
3826

    
3827
    node_insts = self.rpc.call_instance_list([node],
3828
                                             [instance.hypervisor])[node]
3829
    if node_insts is False:
3830
      raise errors.OpExecError("Can't connect to node %s." % node)
3831

    
3832
    if instance.name not in node_insts:
3833
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3834

    
3835
    logging.debug("Connecting to console of %s on %s", instance.name, node)
3836

    
3837
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3838
    console_cmd = hyper.GetShellCommandForConsole(instance)
3839

    
3840
    # build ssh cmdline
3841
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3842

    
3843

    
3844
class LUReplaceDisks(LogicalUnit):
3845
  """Replace the disks of an instance.
3846

3847
  """
3848
  HPATH = "mirrors-replace"
3849
  HTYPE = constants.HTYPE_INSTANCE
3850
  _OP_REQP = ["instance_name", "mode", "disks"]
3851
  REQ_BGL = False
3852

    
3853
  def ExpandNames(self):
3854
    self._ExpandAndLockInstance()
3855

    
3856
    if not hasattr(self.op, "remote_node"):
3857
      self.op.remote_node = None
3858

    
3859
    ia_name = getattr(self.op, "iallocator", None)
3860
    if ia_name is not None:
3861
      if self.op.remote_node is not None:
3862
        raise errors.OpPrereqError("Give either the iallocator or the new"
3863
                                   " secondary, not both")
3864
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3865
    elif self.op.remote_node is not None:
3866
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3867
      if remote_node is None:
3868
        raise errors.OpPrereqError("Node '%s' not known" %
3869
                                   self.op.remote_node)
3870
      self.op.remote_node = remote_node
3871
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3872
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3873
    else:
3874
      self.needed_locks[locking.LEVEL_NODE] = []
3875
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3876

    
3877
  def DeclareLocks(self, level):
3878
    # If we're not already locking all nodes in the set we have to declare the
3879
    # instance's primary/secondary nodes.
3880
    if (level == locking.LEVEL_NODE and
3881
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3882
      self._LockInstancesNodes()
3883

    
3884
  def _RunAllocator(self):
3885
    """Compute a new secondary node using an IAllocator.
3886

3887
    """
3888
    ial = IAllocator(self,
3889
                     mode=constants.IALLOCATOR_MODE_RELOC,
3890
                     name=self.op.instance_name,
3891
                     relocate_from=[self.sec_node])
3892

    
3893
    ial.Run(self.op.iallocator)
3894

    
3895
    if not ial.success:
3896
      raise errors.OpPrereqError("Can't compute nodes using"
3897
                                 " iallocator '%s': %s" % (self.op.iallocator,
3898
                                                           ial.info))
3899
    if len(ial.nodes) != ial.required_nodes:
3900
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3901
                                 " of nodes (%s), required %s" %
3902
                                 (len(ial.nodes), ial.required_nodes))
3903
    self.op.remote_node = ial.nodes[0]
3904
    self.LogInfo("Selected new secondary for the instance: %s",
3905
                 self.op.remote_node)
3906

    
3907
  def BuildHooksEnv(self):
3908
    """Build hooks env.
3909

3910
    This runs on the master, the primary and all the secondaries.
3911

3912
    """
3913
    env = {
3914
      "MODE": self.op.mode,
3915
      "NEW_SECONDARY": self.op.remote_node,
3916
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3917
      }
3918
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3919
    nl = [
3920
      self.cfg.GetMasterNode(),
3921
      self.instance.primary_node,
3922
      ]
3923
    if self.op.remote_node is not None:
3924
      nl.append(self.op.remote_node)
3925
    return env, nl, nl
3926

    
3927
  def CheckPrereq(self):
3928
    """Check prerequisites.
3929

3930
    This checks that the instance is in the cluster.
3931

3932
    """
3933
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3934
    assert instance is not None, \
3935
      "Cannot retrieve locked instance %s" % self.op.instance_name
3936
    self.instance = instance
3937

    
3938
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3939
      raise errors.OpPrereqError("Instance's disk layout is not"
3940
                                 " network mirrored.")
3941

    
3942
    if len(instance.secondary_nodes) != 1:
3943
      raise errors.OpPrereqError("The instance has a strange layout,"
3944
                                 " expected one secondary but found %d" %
3945
                                 len(instance.secondary_nodes))
3946

    
3947
    self.sec_node = instance.secondary_nodes[0]
3948

    
3949
    ia_name = getattr(self.op, "iallocator", None)
3950
    if ia_name is not None:
3951
      self._RunAllocator()
3952

    
3953
    remote_node = self.op.remote_node
3954
    if remote_node is not None:
3955
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3956
      assert self.remote_node_info is not None, \
3957
        "Cannot retrieve locked node %s" % remote_node
3958
    else:
3959
      self.remote_node_info = None
3960
    if remote_node == instance.primary_node:
3961
      raise errors.OpPrereqError("The specified node is the primary node of"
3962
                                 " the instance.")
3963
    elif remote_node == self.sec_node:
3964
      if self.op.mode == constants.REPLACE_DISK_SEC:
3965
        # this is for DRBD8, where we can't execute the same mode of
3966
        # replacement as for drbd7 (no different port allocated)
3967
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3968
                                   " replacement")
3969
    if instance.disk_template == constants.DT_DRBD8:
3970
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3971
          remote_node is not None):
3972
        # switch to replace secondary mode
3973
        self.op.mode = constants.REPLACE_DISK_SEC
3974

    
3975
      if self.op.mode == constants.REPLACE_DISK_ALL:
3976
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3977
                                   " secondary disk replacement, not"
3978
                                   " both at once")
3979
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3980
        if remote_node is not None:
3981
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3982
                                     " the secondary while doing a primary"
3983
                                     " node disk replacement")
3984
        self.tgt_node = instance.primary_node
3985
        self.oth_node = instance.secondary_nodes[0]
3986
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3987
        self.new_node = remote_node # this can be None, in which case
3988
                                    # we don't change the secondary
3989
        self.tgt_node = instance.secondary_nodes[0]
3990
        self.oth_node = instance.primary_node
3991
      else:
3992
        raise errors.ProgrammerError("Unhandled disk replace mode")
3993

    
3994
    if not self.op.disks:
3995
      self.op.disks = range(len(instance.disks))
3996

    
3997
    for disk_idx in self.op.disks:
3998