Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 6605411d

History | View | Annotate | Download (187.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_BGL = True
69

    
70
  def __init__(self, processor, op, context, rpc):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = context.cfg
80
    self.context = context
81
    self.rpc = rpc
82
    # Dicts used to declare locking needs to mcpu
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    self.add_locks = {}
87
    self.remove_locks = {}
88
    # Used to force good behavior when calling helper functions
89
    self.recalculate_locks = {}
90
    self.__ssh = None
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97

    
98
    if not self.cfg.IsCluster():
99
      raise errors.OpPrereqError("Cluster not initialized yet,"
100
                                 " use 'gnt-cluster init' first.")
101
    if self.REQ_MASTER:
102
      master = self.cfg.GetMasterNode()
103
      if master != utils.HostInfo().name:
104
        raise errors.OpPrereqError("Commands must be run on the master"
105
                                   " node %s" % master)
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def ExpandNames(self):
118
    """Expand names for this LU.
119

120
    This method is called before starting to execute the opcode, and it should
121
    update all the parameters of the opcode to their canonical form (e.g. a
122
    short node name must be fully expanded after this method has successfully
123
    completed). This way locking, hooks, logging, ecc. can work correctly.
124

125
    LUs which implement this method must also populate the self.needed_locks
126
    member, as a dict with lock levels as keys, and a list of needed lock names
127
    as values. Rules:
128
      - Use an empty dict if you don't need any lock
129
      - If you don't need any lock at a particular level omit that level
130
      - Don't put anything for the BGL level
131
      - If you want all locks at a level use locking.ALL_SET as a value
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: locking.ALL_SET,
141
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self, primary_only=False):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    @type primary_only: boolean
286
    @param primary_only: only lock primary nodes of locked instances
287

288
    """
289
    assert locking.LEVEL_NODE in self.recalculate_locks, \
290
      "_LockInstancesNodes helper function called with no nodes to recalculate"
291

    
292
    # TODO: check if we're really been called with the instance locks held
293

    
294
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295
    # future we might want to have different behaviors depending on the value
296
    # of self.recalculate_locks[locking.LEVEL_NODE]
297
    wanted_nodes = []
298
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299
      instance = self.context.cfg.GetInstanceInfo(instance_name)
300
      wanted_nodes.append(instance.primary_node)
301
      if not primary_only:
302
        wanted_nodes.extend(instance.secondary_nodes)
303

    
304
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308

    
309
    del self.recalculate_locks[locking.LEVEL_NODE]
310

    
311

    
312
class NoHooksLU(LogicalUnit):
313
  """Simple LU which runs no hooks.
314

315
  This LU is intended as a parent for other LogicalUnits which will
316
  run no hooks, in order to reduce duplicate code.
317

318
  """
319
  HPATH = None
320
  HTYPE = None
321

    
322

    
323
def _GetWantedNodes(lu, nodes):
324
  """Returns list of checked and expanded node names.
325

326
  Args:
327
    nodes: List of nodes (strings) or None for all
328

329
  """
330
  if not isinstance(nodes, list):
331
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
332

    
333
  if not nodes:
334
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335
      " non-empty list of nodes whose name is to be expanded.")
336

    
337
  wanted = []
338
  for name in nodes:
339
    node = lu.cfg.ExpandNodeName(name)
340
    if node is None:
341
      raise errors.OpPrereqError("No such node name '%s'" % name)
342
    wanted.append(node)
343

    
344
  return utils.NiceSort(wanted)
345

    
346

    
347
def _GetWantedInstances(lu, instances):
348
  """Returns list of checked and expanded instance names.
349

350
  Args:
351
    instances: List of instances (strings) or None for all
352

353
  """
354
  if not isinstance(instances, list):
355
    raise errors.OpPrereqError("Invalid argument type 'instances'")
356

    
357
  if instances:
358
    wanted = []
359

    
360
    for name in instances:
361
      instance = lu.cfg.ExpandInstanceName(name)
362
      if instance is None:
363
        raise errors.OpPrereqError("No such instance name '%s'" % name)
364
      wanted.append(instance)
365

    
366
  else:
367
    wanted = lu.cfg.GetInstanceList()
368
  return utils.NiceSort(wanted)
369

    
370

    
371
def _CheckOutputFields(static, dynamic, selected):
372
  """Checks whether all selected fields are valid.
373

374
  Args:
375
    static: Static fields
376
    dynamic: Dynamic fields
377

378
  """
379
  static_fields = frozenset(static)
380
  dynamic_fields = frozenset(dynamic)
381

    
382
  all_fields = static_fields | dynamic_fields
383

    
384
  if not all_fields.issuperset(selected):
385
    raise errors.OpPrereqError("Unknown output fields selected: %s"
386
                               % ",".join(frozenset(selected).
387
                                          difference(all_fields)))
388

    
389

    
390
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391
                          memory, vcpus, nics):
392
  """Builds instance related env variables for hooks from single variables.
393

394
  Args:
395
    secondary_nodes: List of secondary nodes as strings
396
  """
397
  env = {
398
    "OP_TARGET": name,
399
    "INSTANCE_NAME": name,
400
    "INSTANCE_PRIMARY": primary_node,
401
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402
    "INSTANCE_OS_TYPE": os_type,
403
    "INSTANCE_STATUS": status,
404
    "INSTANCE_MEMORY": memory,
405
    "INSTANCE_VCPUS": vcpus,
406
  }
407

    
408
  if nics:
409
    nic_count = len(nics)
410
    for idx, (ip, bridge, mac) in enumerate(nics):
411
      if ip is None:
412
        ip = ""
413
      env["INSTANCE_NIC%d_IP" % idx] = ip
414
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416
  else:
417
    nic_count = 0
418

    
419
  env["INSTANCE_NIC_COUNT"] = nic_count
420

    
421
  return env
422

    
423

    
424
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
425
  """Builds instance related env variables for hooks from an object.
426

427
  Args:
428
    instance: objects.Instance object of instance
429
    override: dict of values to override
430
  """
431
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': bep[constants.BE_MEMORY],
439
    'vcpus': bep[constants.BE_VCPUS],
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(lu, instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.cfg.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.cfg.GetMasterNode()
489
    if not self.rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    if not node_result:
555
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
556
      return True
557

    
558
    # checks config file checksum
559
    # checks ssh to any
560

    
561
    if 'filelist' not in node_result:
562
      bad = True
563
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
564
    else:
565
      remote_cksum = node_result['filelist']
566
      for file_name in file_list:
567
        if file_name not in remote_cksum:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
570
        elif remote_cksum[file_name] != local_cksum[file_name]:
571
          bad = True
572
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
573

    
574
    if 'nodelist' not in node_result:
575
      bad = True
576
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
577
    else:
578
      if node_result['nodelist']:
579
        bad = True
580
        for node in node_result['nodelist']:
581
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
582
                          (node, node_result['nodelist'][node]))
583
    if 'node-net-test' not in node_result:
584
      bad = True
585
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
586
    else:
587
      if node_result['node-net-test']:
588
        bad = True
589
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
590
        for node in nlist:
591
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
592
                          (node, node_result['node-net-test'][node]))
593

    
594
    hyp_result = node_result.get('hypervisor', None)
595
    if isinstance(hyp_result, dict):
596
      for hv_name, hv_result in hyp_result.iteritems():
597
        if hv_result is not None:
598
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
599
                      (hv_name, hv_result))
600
    return bad
601

    
602
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
603
                      node_instance, feedback_fn):
604
    """Verify an instance.
605

606
    This function checks to see if the required block devices are
607
    available on the instance's node.
608

609
    """
610
    bad = False
611

    
612
    node_current = instanceconfig.primary_node
613

    
614
    node_vol_should = {}
615
    instanceconfig.MapLVsByNode(node_vol_should)
616

    
617
    for node in node_vol_should:
618
      for volume in node_vol_should[node]:
619
        if node not in node_vol_is or volume not in node_vol_is[node]:
620
          feedback_fn("  - ERROR: volume %s missing on node %s" %
621
                          (volume, node))
622
          bad = True
623

    
624
    if not instanceconfig.status == 'down':
625
      if (node_current not in node_instance or
626
          not instance in node_instance[node_current]):
627
        feedback_fn("  - ERROR: instance %s not running on node %s" %
628
                        (instance, node_current))
629
        bad = True
630

    
631
    for node in node_instance:
632
      if (not node == node_current):
633
        if instance in node_instance[node]:
634
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
635
                          (instance, node))
636
          bad = True
637

    
638
    return bad
639

    
640
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
641
    """Verify if there are any unknown volumes in the cluster.
642

643
    The .os, .swap and backup volumes are ignored. All other volumes are
644
    reported as unknown.
645

646
    """
647
    bad = False
648

    
649
    for node in node_vol_is:
650
      for volume in node_vol_is[node]:
651
        if node not in node_vol_should or volume not in node_vol_should[node]:
652
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
653
                      (volume, node))
654
          bad = True
655
    return bad
656

    
657
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
658
    """Verify the list of running instances.
659

660
    This checks what instances are running but unknown to the cluster.
661

662
    """
663
    bad = False
664
    for node in node_instance:
665
      for runninginstance in node_instance[node]:
666
        if runninginstance not in instancelist:
667
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
668
                          (runninginstance, node))
669
          bad = True
670
    return bad
671

    
672
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
673
    """Verify N+1 Memory Resilience.
674

675
    Check that if one single node dies we can still start all the instances it
676
    was primary for.
677

678
    """
679
    bad = False
680

    
681
    for node, nodeinfo in node_info.iteritems():
682
      # This code checks that every node which is now listed as secondary has
683
      # enough memory to host all instances it is supposed to should a single
684
      # other node in the cluster fail.
685
      # FIXME: not ready for failover to an arbitrary node
686
      # FIXME: does not support file-backed instances
687
      # WARNING: we currently take into account down instances as well as up
688
      # ones, considering that even if they're down someone might want to start
689
      # them even in the event of a node failure.
690
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691
        needed_mem = 0
692
        for instance in instances:
693
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
694
          if bep[constants.BE_AUTO_BALANCE]:
695
            needed_mem += bep[constants.BE_MEMORY]
696
        if nodeinfo['mfree'] < needed_mem:
697
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
698
                      " failovers should node %s fail" % (node, prinode))
699
          bad = True
700
    return bad
701

    
702
  def CheckPrereq(self):
703
    """Check prerequisites.
704

705
    Transform the list of checks we're going to skip into a set and check that
706
    all its members are valid.
707

708
    """
709
    self.skip_set = frozenset(self.op.skip_checks)
710
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
711
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
712

    
713
  def BuildHooksEnv(self):
714
    """Build hooks env.
715

716
    Cluster-Verify hooks just rone in the post phase and their failure makes
717
    the output be logged in the verify output and the verification to fail.
718

719
    """
720
    all_nodes = self.cfg.GetNodeList()
721
    # TODO: populate the environment with useful information for verify hooks
722
    env = {}
723
    return env, [], all_nodes
724

    
725
  def Exec(self, feedback_fn):
726
    """Verify integrity of cluster, performing various test on nodes.
727

728
    """
729
    bad = False
730
    feedback_fn("* Verifying global settings")
731
    for msg in self.cfg.VerifyConfig():
732
      feedback_fn("  - ERROR: %s" % msg)
733

    
734
    vg_name = self.cfg.GetVGName()
735
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
736
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
737
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
738
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
739
    i_non_redundant = [] # Non redundant instances
740
    i_non_a_balanced = [] # Non auto-balanced instances
741
    node_volume = {}
742
    node_instance = {}
743
    node_info = {}
744
    instance_cfg = {}
745

    
746
    # FIXME: verify OS list
747
    # do local checksums
748
    file_names = []
749
    file_names.append(constants.SSL_CERT_FILE)
750
    file_names.append(constants.CLUSTER_CONF_FILE)
751
    local_checksums = utils.FingerprintFiles(file_names)
752

    
753
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
754
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
755
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
756
    all_vglist = self.rpc.call_vg_list(nodelist)
757
    node_verify_param = {
758
      'filelist': file_names,
759
      'nodelist': nodelist,
760
      'hypervisor': hypervisors,
761
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
762
                        for node in nodeinfo]
763
      }
764
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
765
                                           self.cfg.GetClusterName())
766
    all_rversion = self.rpc.call_version(nodelist)
767
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
768
                                        self.cfg.GetHypervisorType())
769

    
770
    cluster = self.cfg.GetClusterInfo()
771
    for node in nodelist:
772
      feedback_fn("* Verifying node %s" % node)
773
      result = self._VerifyNode(node, file_names, local_checksums,
774
                                all_vglist[node], all_nvinfo[node],
775
                                all_rversion[node], feedback_fn)
776
      bad = bad or result
777

    
778
      # node_volume
779
      volumeinfo = all_volumeinfo[node]
780

    
781
      if isinstance(volumeinfo, basestring):
782
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
783
                    (node, volumeinfo[-400:].encode('string_escape')))
784
        bad = True
785
        node_volume[node] = {}
786
      elif not isinstance(volumeinfo, dict):
787
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
788
        bad = True
789
        continue
790
      else:
791
        node_volume[node] = volumeinfo
792

    
793
      # node_instance
794
      nodeinstance = all_instanceinfo[node]
795
      if type(nodeinstance) != list:
796
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
797
        bad = True
798
        continue
799

    
800
      node_instance[node] = nodeinstance
801

    
802
      # node_info
803
      nodeinfo = all_ninfo[node]
804
      if not isinstance(nodeinfo, dict):
805
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
806
        bad = True
807
        continue
808

    
809
      try:
810
        node_info[node] = {
811
          "mfree": int(nodeinfo['memory_free']),
812
          "dfree": int(nodeinfo['vg_free']),
813
          "pinst": [],
814
          "sinst": [],
815
          # dictionary holding all instances this node is secondary for,
816
          # grouped by their primary node. Each key is a cluster node, and each
817
          # value is a list of instances which have the key as primary and the
818
          # current node as secondary.  this is handy to calculate N+1 memory
819
          # availability if you can only failover from a primary to its
820
          # secondary.
821
          "sinst-by-pnode": {},
822
        }
823
      except ValueError:
824
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
825
        bad = True
826
        continue
827

    
828
    node_vol_should = {}
829

    
830
    for instance in instancelist:
831
      feedback_fn("* Verifying instance %s" % instance)
832
      inst_config = self.cfg.GetInstanceInfo(instance)
833
      result =  self._VerifyInstance(instance, inst_config, node_volume,
834
                                     node_instance, feedback_fn)
835
      bad = bad or result
836

    
837
      inst_config.MapLVsByNode(node_vol_should)
838

    
839
      instance_cfg[instance] = inst_config
840

    
841
      pnode = inst_config.primary_node
842
      if pnode in node_info:
843
        node_info[pnode]['pinst'].append(instance)
844
      else:
845
        feedback_fn("  - ERROR: instance %s, connection to primary node"
846
                    " %s failed" % (instance, pnode))
847
        bad = True
848

    
849
      # If the instance is non-redundant we cannot survive losing its primary
850
      # node, so we are not N+1 compliant. On the other hand we have no disk
851
      # templates with more than one secondary so that situation is not well
852
      # supported either.
853
      # FIXME: does not support file-backed instances
854
      if len(inst_config.secondary_nodes) == 0:
855
        i_non_redundant.append(instance)
856
      elif len(inst_config.secondary_nodes) > 1:
857
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
858
                    % instance)
859

    
860
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
861
        i_non_a_balanced.append(instance)
862

    
863
      for snode in inst_config.secondary_nodes:
864
        if snode in node_info:
865
          node_info[snode]['sinst'].append(instance)
866
          if pnode not in node_info[snode]['sinst-by-pnode']:
867
            node_info[snode]['sinst-by-pnode'][pnode] = []
868
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
869
        else:
870
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
871
                      " %s failed" % (instance, snode))
872

    
873
    feedback_fn("* Verifying orphan volumes")
874
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
875
                                       feedback_fn)
876
    bad = bad or result
877

    
878
    feedback_fn("* Verifying remaining instances")
879
    result = self._VerifyOrphanInstances(instancelist, node_instance,
880
                                         feedback_fn)
881
    bad = bad or result
882

    
883
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
884
      feedback_fn("* Verifying N+1 Memory redundancy")
885
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
886
      bad = bad or result
887

    
888
    feedback_fn("* Other Notes")
889
    if i_non_redundant:
890
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
891
                  % len(i_non_redundant))
892

    
893
    if i_non_a_balanced:
894
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
895
                  % len(i_non_a_balanced))
896

    
897
    return not bad
898

    
899
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
900
    """Analize the post-hooks' result, handle it, and send some
901
    nicely-formatted feedback back to the user.
902

903
    Args:
904
      phase: the hooks phase that has just been run
905
      hooks_results: the results of the multi-node hooks rpc call
906
      feedback_fn: function to send feedback back to the caller
907
      lu_result: previous Exec result
908

909
    """
910
    # We only really run POST phase hooks, and are only interested in
911
    # their results
912
    if phase == constants.HOOKS_PHASE_POST:
913
      # Used to change hooks' output to proper indentation
914
      indent_re = re.compile('^', re.M)
915
      feedback_fn("* Hooks Results")
916
      if not hooks_results:
917
        feedback_fn("  - ERROR: general communication failure")
918
        lu_result = 1
919
      else:
920
        for node_name in hooks_results:
921
          show_node_header = True
922
          res = hooks_results[node_name]
923
          if res is False or not isinstance(res, list):
924
            feedback_fn("    Communication failure")
925
            lu_result = 1
926
            continue
927
          for script, hkr, output in res:
928
            if hkr == constants.HKR_FAIL:
929
              # The node header is only shown once, if there are
930
              # failing hooks on that node
931
              if show_node_header:
932
                feedback_fn("  Node %s:" % node_name)
933
                show_node_header = False
934
              feedback_fn("    ERROR: Script %s failed, output:" % script)
935
              output = indent_re.sub('      ', output)
936
              feedback_fn("%s" % output)
937
              lu_result = 1
938

    
939
      return lu_result
940

    
941

    
942
class LUVerifyDisks(NoHooksLU):
943
  """Verifies the cluster disks status.
944

945
  """
946
  _OP_REQP = []
947
  REQ_BGL = False
948

    
949
  def ExpandNames(self):
950
    self.needed_locks = {
951
      locking.LEVEL_NODE: locking.ALL_SET,
952
      locking.LEVEL_INSTANCE: locking.ALL_SET,
953
    }
954
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
955

    
956
  def CheckPrereq(self):
957
    """Check prerequisites.
958

959
    This has no prerequisites.
960

961
    """
962
    pass
963

    
964
  def Exec(self, feedback_fn):
965
    """Verify integrity of cluster disks.
966

967
    """
968
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
969

    
970
    vg_name = self.cfg.GetVGName()
971
    nodes = utils.NiceSort(self.cfg.GetNodeList())
972
    instances = [self.cfg.GetInstanceInfo(name)
973
                 for name in self.cfg.GetInstanceList()]
974

    
975
    nv_dict = {}
976
    for inst in instances:
977
      inst_lvs = {}
978
      if (inst.status != "up" or
979
          inst.disk_template not in constants.DTS_NET_MIRROR):
980
        continue
981
      inst.MapLVsByNode(inst_lvs)
982
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
983
      for node, vol_list in inst_lvs.iteritems():
984
        for vol in vol_list:
985
          nv_dict[(node, vol)] = inst
986

    
987
    if not nv_dict:
988
      return result
989

    
990
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
991

    
992
    to_act = set()
993
    for node in nodes:
994
      # node_volume
995
      lvs = node_lvs[node]
996

    
997
      if isinstance(lvs, basestring):
998
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
999
        res_nlvm[node] = lvs
1000
      elif not isinstance(lvs, dict):
1001
        logger.Info("connection to node %s failed or invalid data returned" %
1002
                    (node,))
1003
        res_nodes.append(node)
1004
        continue
1005

    
1006
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1007
        inst = nv_dict.pop((node, lv_name), None)
1008
        if (not lv_online and inst is not None
1009
            and inst.name not in res_instances):
1010
          res_instances.append(inst.name)
1011

    
1012
    # any leftover items in nv_dict are missing LVs, let's arrange the
1013
    # data better
1014
    for key, inst in nv_dict.iteritems():
1015
      if inst.name not in res_missing:
1016
        res_missing[inst.name] = []
1017
      res_missing[inst.name].append(key)
1018

    
1019
    return result
1020

    
1021

    
1022
class LURenameCluster(LogicalUnit):
1023
  """Rename the cluster.
1024

1025
  """
1026
  HPATH = "cluster-rename"
1027
  HTYPE = constants.HTYPE_CLUSTER
1028
  _OP_REQP = ["name"]
1029

    
1030
  def BuildHooksEnv(self):
1031
    """Build hooks env.
1032

1033
    """
1034
    env = {
1035
      "OP_TARGET": self.cfg.GetClusterName(),
1036
      "NEW_NAME": self.op.name,
1037
      }
1038
    mn = self.cfg.GetMasterNode()
1039
    return env, [mn], [mn]
1040

    
1041
  def CheckPrereq(self):
1042
    """Verify that the passed name is a valid one.
1043

1044
    """
1045
    hostname = utils.HostInfo(self.op.name)
1046

    
1047
    new_name = hostname.name
1048
    self.ip = new_ip = hostname.ip
1049
    old_name = self.cfg.GetClusterName()
1050
    old_ip = self.cfg.GetMasterIP()
1051
    if new_name == old_name and new_ip == old_ip:
1052
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1053
                                 " cluster has changed")
1054
    if new_ip != old_ip:
1055
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1056
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1057
                                   " reachable on the network. Aborting." %
1058
                                   new_ip)
1059

    
1060
    self.op.name = new_name
1061

    
1062
  def Exec(self, feedback_fn):
1063
    """Rename the cluster.
1064

1065
    """
1066
    clustername = self.op.name
1067
    ip = self.ip
1068

    
1069
    # shutdown the master IP
1070
    master = self.cfg.GetMasterNode()
1071
    if not self.rpc.call_node_stop_master(master, False):
1072
      raise errors.OpExecError("Could not disable the master role")
1073

    
1074
    try:
1075
      # modify the sstore
1076
      # TODO: sstore
1077
      ss.SetKey(ss.SS_MASTER_IP, ip)
1078
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1079

    
1080
      # Distribute updated ss config to all nodes
1081
      myself = self.cfg.GetNodeInfo(master)
1082
      dist_nodes = self.cfg.GetNodeList()
1083
      if myself.name in dist_nodes:
1084
        dist_nodes.remove(myself.name)
1085

    
1086
      logger.Debug("Copying updated ssconf data to all nodes")
1087
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1088
        fname = ss.KeyToFilename(keyname)
1089
        result = self.rpc.call_upload_file(dist_nodes, fname)
1090
        for to_node in dist_nodes:
1091
          if not result[to_node]:
1092
            logger.Error("copy of file %s to node %s failed" %
1093
                         (fname, to_node))
1094
    finally:
1095
      if not self.rpc.call_node_start_master(master, False):
1096
        logger.Error("Could not re-enable the master role on the master,"
1097
                     " please restart manually.")
1098

    
1099

    
1100
def _RecursiveCheckIfLVMBased(disk):
1101
  """Check if the given disk or its children are lvm-based.
1102

1103
  Args:
1104
    disk: ganeti.objects.Disk object
1105

1106
  Returns:
1107
    boolean indicating whether a LD_LV dev_type was found or not
1108

1109
  """
1110
  if disk.children:
1111
    for chdisk in disk.children:
1112
      if _RecursiveCheckIfLVMBased(chdisk):
1113
        return True
1114
  return disk.dev_type == constants.LD_LV
1115

    
1116

    
1117
class LUSetClusterParams(LogicalUnit):
1118
  """Change the parameters of the cluster.
1119

1120
  """
1121
  HPATH = "cluster-modify"
1122
  HTYPE = constants.HTYPE_CLUSTER
1123
  _OP_REQP = []
1124
  REQ_BGL = False
1125

    
1126
  def ExpandNames(self):
1127
    # FIXME: in the future maybe other cluster params won't require checking on
1128
    # all nodes to be modified.
1129
    self.needed_locks = {
1130
      locking.LEVEL_NODE: locking.ALL_SET,
1131
    }
1132
    self.share_locks[locking.LEVEL_NODE] = 1
1133

    
1134
  def BuildHooksEnv(self):
1135
    """Build hooks env.
1136

1137
    """
1138
    env = {
1139
      "OP_TARGET": self.cfg.GetClusterName(),
1140
      "NEW_VG_NAME": self.op.vg_name,
1141
      }
1142
    mn = self.cfg.GetMasterNode()
1143
    return env, [mn], [mn]
1144

    
1145
  def CheckPrereq(self):
1146
    """Check prerequisites.
1147

1148
    This checks whether the given params don't conflict and
1149
    if the given volume group is valid.
1150

1151
    """
1152
    # FIXME: This only works because there is only one parameter that can be
1153
    # changed or removed.
1154
    if not self.op.vg_name:
1155
      instances = self.cfg.GetAllInstancesInfo().values()
1156
      for inst in instances:
1157
        for disk in inst.disks:
1158
          if _RecursiveCheckIfLVMBased(disk):
1159
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1160
                                       " lvm-based instances exist")
1161

    
1162
    # if vg_name not None, checks given volume group on all nodes
1163
    if self.op.vg_name:
1164
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1165
      vglist = self.rpc.call_vg_list(node_list)
1166
      for node in node_list:
1167
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1168
                                              constants.MIN_VG_SIZE)
1169
        if vgstatus:
1170
          raise errors.OpPrereqError("Error on node '%s': %s" %
1171
                                     (node, vgstatus))
1172

    
1173
  def Exec(self, feedback_fn):
1174
    """Change the parameters of the cluster.
1175

1176
    """
1177
    if self.op.vg_name != self.cfg.GetVGName():
1178
      self.cfg.SetVGName(self.op.vg_name)
1179
    else:
1180
      feedback_fn("Cluster LVM configuration already in desired"
1181
                  " state, not changing")
1182

    
1183

    
1184
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1185
  """Sleep and poll for an instance's disk to sync.
1186

1187
  """
1188
  if not instance.disks:
1189
    return True
1190

    
1191
  if not oneshot:
1192
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1193

    
1194
  node = instance.primary_node
1195

    
1196
  for dev in instance.disks:
1197
    lu.cfg.SetDiskID(dev, node)
1198

    
1199
  retries = 0
1200
  while True:
1201
    max_time = 0
1202
    done = True
1203
    cumul_degraded = False
1204
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1205
    if not rstats:
1206
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1207
      retries += 1
1208
      if retries >= 10:
1209
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1210
                                 " aborting." % node)
1211
      time.sleep(6)
1212
      continue
1213
    retries = 0
1214
    for i in range(len(rstats)):
1215
      mstat = rstats[i]
1216
      if mstat is None:
1217
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1218
                           (node, instance.disks[i].iv_name))
1219
        continue
1220
      # we ignore the ldisk parameter
1221
      perc_done, est_time, is_degraded, _ = mstat
1222
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1223
      if perc_done is not None:
1224
        done = False
1225
        if est_time is not None:
1226
          rem_time = "%d estimated seconds remaining" % est_time
1227
          max_time = est_time
1228
        else:
1229
          rem_time = "no time estimate"
1230
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1231
                        (instance.disks[i].iv_name, perc_done, rem_time))
1232
    if done or oneshot:
1233
      break
1234

    
1235
    time.sleep(min(60, max_time))
1236

    
1237
  if done:
1238
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1239
  return not cumul_degraded
1240

    
1241

    
1242
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1243
  """Check that mirrors are not degraded.
1244

1245
  The ldisk parameter, if True, will change the test from the
1246
  is_degraded attribute (which represents overall non-ok status for
1247
  the device(s)) to the ldisk (representing the local storage status).
1248

1249
  """
1250
  lu.cfg.SetDiskID(dev, node)
1251
  if ldisk:
1252
    idx = 6
1253
  else:
1254
    idx = 5
1255

    
1256
  result = True
1257
  if on_primary or dev.AssembleOnSecondary():
1258
    rstats = lu.rpc.call_blockdev_find(node, dev)
1259
    if not rstats:
1260
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1261
      result = False
1262
    else:
1263
      result = result and (not rstats[idx])
1264
  if dev.children:
1265
    for child in dev.children:
1266
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1267

    
1268
  return result
1269

    
1270

    
1271
class LUDiagnoseOS(NoHooksLU):
1272
  """Logical unit for OS diagnose/query.
1273

1274
  """
1275
  _OP_REQP = ["output_fields", "names"]
1276
  REQ_BGL = False
1277

    
1278
  def ExpandNames(self):
1279
    if self.op.names:
1280
      raise errors.OpPrereqError("Selective OS query not supported")
1281

    
1282
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1283
    _CheckOutputFields(static=[],
1284
                       dynamic=self.dynamic_fields,
1285
                       selected=self.op.output_fields)
1286

    
1287
    # Lock all nodes, in shared mode
1288
    self.needed_locks = {}
1289
    self.share_locks[locking.LEVEL_NODE] = 1
1290
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1291

    
1292
  def CheckPrereq(self):
1293
    """Check prerequisites.
1294

1295
    """
1296

    
1297
  @staticmethod
1298
  def _DiagnoseByOS(node_list, rlist):
1299
    """Remaps a per-node return list into an a per-os per-node dictionary
1300

1301
      Args:
1302
        node_list: a list with the names of all nodes
1303
        rlist: a map with node names as keys and OS objects as values
1304

1305
      Returns:
1306
        map: a map with osnames as keys and as value another map, with
1307
             nodes as
1308
             keys and list of OS objects as values
1309
             e.g. {"debian-etch": {"node1": [<object>,...],
1310
                                   "node2": [<object>,]}
1311
                  }
1312

1313
    """
1314
    all_os = {}
1315
    for node_name, nr in rlist.iteritems():
1316
      if not nr:
1317
        continue
1318
      for os_obj in nr:
1319
        if os_obj.name not in all_os:
1320
          # build a list of nodes for this os containing empty lists
1321
          # for each node in node_list
1322
          all_os[os_obj.name] = {}
1323
          for nname in node_list:
1324
            all_os[os_obj.name][nname] = []
1325
        all_os[os_obj.name][node_name].append(os_obj)
1326
    return all_os
1327

    
1328
  def Exec(self, feedback_fn):
1329
    """Compute the list of OSes.
1330

1331
    """
1332
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1333
    node_data = self.rpc.call_os_diagnose(node_list)
1334
    if node_data == False:
1335
      raise errors.OpExecError("Can't gather the list of OSes")
1336
    pol = self._DiagnoseByOS(node_list, node_data)
1337
    output = []
1338
    for os_name, os_data in pol.iteritems():
1339
      row = []
1340
      for field in self.op.output_fields:
1341
        if field == "name":
1342
          val = os_name
1343
        elif field == "valid":
1344
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1345
        elif field == "node_status":
1346
          val = {}
1347
          for node_name, nos_list in os_data.iteritems():
1348
            val[node_name] = [(v.status, v.path) for v in nos_list]
1349
        else:
1350
          raise errors.ParameterError(field)
1351
        row.append(val)
1352
      output.append(row)
1353

    
1354
    return output
1355

    
1356

    
1357
class LURemoveNode(LogicalUnit):
1358
  """Logical unit for removing a node.
1359

1360
  """
1361
  HPATH = "node-remove"
1362
  HTYPE = constants.HTYPE_NODE
1363
  _OP_REQP = ["node_name"]
1364

    
1365
  def BuildHooksEnv(self):
1366
    """Build hooks env.
1367

1368
    This doesn't run on the target node in the pre phase as a failed
1369
    node would then be impossible to remove.
1370

1371
    """
1372
    env = {
1373
      "OP_TARGET": self.op.node_name,
1374
      "NODE_NAME": self.op.node_name,
1375
      }
1376
    all_nodes = self.cfg.GetNodeList()
1377
    all_nodes.remove(self.op.node_name)
1378
    return env, all_nodes, all_nodes
1379

    
1380
  def CheckPrereq(self):
1381
    """Check prerequisites.
1382

1383
    This checks:
1384
     - the node exists in the configuration
1385
     - it does not have primary or secondary instances
1386
     - it's not the master
1387

1388
    Any errors are signalled by raising errors.OpPrereqError.
1389

1390
    """
1391
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1392
    if node is None:
1393
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1394

    
1395
    instance_list = self.cfg.GetInstanceList()
1396

    
1397
    masternode = self.cfg.GetMasterNode()
1398
    if node.name == masternode:
1399
      raise errors.OpPrereqError("Node is the master node,"
1400
                                 " you need to failover first.")
1401

    
1402
    for instance_name in instance_list:
1403
      instance = self.cfg.GetInstanceInfo(instance_name)
1404
      if node.name == instance.primary_node:
1405
        raise errors.OpPrereqError("Instance %s still running on the node,"
1406
                                   " please remove first." % instance_name)
1407
      if node.name in instance.secondary_nodes:
1408
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1409
                                   " please remove first." % instance_name)
1410
    self.op.node_name = node.name
1411
    self.node = node
1412

    
1413
  def Exec(self, feedback_fn):
1414
    """Removes the node from the cluster.
1415

1416
    """
1417
    node = self.node
1418
    logger.Info("stopping the node daemon and removing configs from node %s" %
1419
                node.name)
1420

    
1421
    self.context.RemoveNode(node.name)
1422

    
1423
    self.rpc.call_node_leave_cluster(node.name)
1424

    
1425

    
1426
class LUQueryNodes(NoHooksLU):
1427
  """Logical unit for querying nodes.
1428

1429
  """
1430
  _OP_REQP = ["output_fields", "names"]
1431
  REQ_BGL = False
1432

    
1433
  def ExpandNames(self):
1434
    self.dynamic_fields = frozenset([
1435
      "dtotal", "dfree",
1436
      "mtotal", "mnode", "mfree",
1437
      "bootid",
1438
      "ctotal",
1439
      ])
1440

    
1441
    self.static_fields = frozenset([
1442
      "name", "pinst_cnt", "sinst_cnt",
1443
      "pinst_list", "sinst_list",
1444
      "pip", "sip", "tags",
1445
      "serial_no",
1446
      ])
1447

    
1448
    _CheckOutputFields(static=self.static_fields,
1449
                       dynamic=self.dynamic_fields,
1450
                       selected=self.op.output_fields)
1451

    
1452
    self.needed_locks = {}
1453
    self.share_locks[locking.LEVEL_NODE] = 1
1454

    
1455
    if self.op.names:
1456
      self.wanted = _GetWantedNodes(self, self.op.names)
1457
    else:
1458
      self.wanted = locking.ALL_SET
1459

    
1460
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1461
    if self.do_locking:
1462
      # if we don't request only static fields, we need to lock the nodes
1463
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1464

    
1465

    
1466
  def CheckPrereq(self):
1467
    """Check prerequisites.
1468

1469
    """
1470
    # The validation of the node list is done in the _GetWantedNodes,
1471
    # if non empty, and if empty, there's no validation to do
1472
    pass
1473

    
1474
  def Exec(self, feedback_fn):
1475
    """Computes the list of nodes and their attributes.
1476

1477
    """
1478
    all_info = self.cfg.GetAllNodesInfo()
1479
    if self.do_locking:
1480
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1481
    elif self.wanted != locking.ALL_SET:
1482
      nodenames = self.wanted
1483
      missing = set(nodenames).difference(all_info.keys())
1484
      if missing:
1485
        raise errors.OpExecError(
1486
          "Some nodes were removed before retrieving their data: %s" % missing)
1487
    else:
1488
      nodenames = all_info.keys()
1489

    
1490
    nodenames = utils.NiceSort(nodenames)
1491
    nodelist = [all_info[name] for name in nodenames]
1492

    
1493
    # begin data gathering
1494

    
1495
    if self.dynamic_fields.intersection(self.op.output_fields):
1496
      live_data = {}
1497
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1498
                                          self.cfg.GetHypervisorType())
1499
      for name in nodenames:
1500
        nodeinfo = node_data.get(name, None)
1501
        if nodeinfo:
1502
          live_data[name] = {
1503
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1504
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1505
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1506
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1507
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1508
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1509
            "bootid": nodeinfo['bootid'],
1510
            }
1511
        else:
1512
          live_data[name] = {}
1513
    else:
1514
      live_data = dict.fromkeys(nodenames, {})
1515

    
1516
    node_to_primary = dict([(name, set()) for name in nodenames])
1517
    node_to_secondary = dict([(name, set()) for name in nodenames])
1518

    
1519
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1520
                             "sinst_cnt", "sinst_list"))
1521
    if inst_fields & frozenset(self.op.output_fields):
1522
      instancelist = self.cfg.GetInstanceList()
1523

    
1524
      for instance_name in instancelist:
1525
        inst = self.cfg.GetInstanceInfo(instance_name)
1526
        if inst.primary_node in node_to_primary:
1527
          node_to_primary[inst.primary_node].add(inst.name)
1528
        for secnode in inst.secondary_nodes:
1529
          if secnode in node_to_secondary:
1530
            node_to_secondary[secnode].add(inst.name)
1531

    
1532
    # end data gathering
1533

    
1534
    output = []
1535
    for node in nodelist:
1536
      node_output = []
1537
      for field in self.op.output_fields:
1538
        if field == "name":
1539
          val = node.name
1540
        elif field == "pinst_list":
1541
          val = list(node_to_primary[node.name])
1542
        elif field == "sinst_list":
1543
          val = list(node_to_secondary[node.name])
1544
        elif field == "pinst_cnt":
1545
          val = len(node_to_primary[node.name])
1546
        elif field == "sinst_cnt":
1547
          val = len(node_to_secondary[node.name])
1548
        elif field == "pip":
1549
          val = node.primary_ip
1550
        elif field == "sip":
1551
          val = node.secondary_ip
1552
        elif field == "tags":
1553
          val = list(node.GetTags())
1554
        elif field == "serial_no":
1555
          val = node.serial_no
1556
        elif field in self.dynamic_fields:
1557
          val = live_data[node.name].get(field, None)
1558
        else:
1559
          raise errors.ParameterError(field)
1560
        node_output.append(val)
1561
      output.append(node_output)
1562

    
1563
    return output
1564

    
1565

    
1566
class LUQueryNodeVolumes(NoHooksLU):
1567
  """Logical unit for getting volumes on node(s).
1568

1569
  """
1570
  _OP_REQP = ["nodes", "output_fields"]
1571
  REQ_BGL = False
1572

    
1573
  def ExpandNames(self):
1574
    _CheckOutputFields(static=["node"],
1575
                       dynamic=["phys", "vg", "name", "size", "instance"],
1576
                       selected=self.op.output_fields)
1577

    
1578
    self.needed_locks = {}
1579
    self.share_locks[locking.LEVEL_NODE] = 1
1580
    if not self.op.nodes:
1581
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1582
    else:
1583
      self.needed_locks[locking.LEVEL_NODE] = \
1584
        _GetWantedNodes(self, self.op.nodes)
1585

    
1586
  def CheckPrereq(self):
1587
    """Check prerequisites.
1588

1589
    This checks that the fields required are valid output fields.
1590

1591
    """
1592
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1593

    
1594
  def Exec(self, feedback_fn):
1595
    """Computes the list of nodes and their attributes.
1596

1597
    """
1598
    nodenames = self.nodes
1599
    volumes = self.rpc.call_node_volumes(nodenames)
1600

    
1601
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1602
             in self.cfg.GetInstanceList()]
1603

    
1604
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1605

    
1606
    output = []
1607
    for node in nodenames:
1608
      if node not in volumes or not volumes[node]:
1609
        continue
1610

    
1611
      node_vols = volumes[node][:]
1612
      node_vols.sort(key=lambda vol: vol['dev'])
1613

    
1614
      for vol in node_vols:
1615
        node_output = []
1616
        for field in self.op.output_fields:
1617
          if field == "node":
1618
            val = node
1619
          elif field == "phys":
1620
            val = vol['dev']
1621
          elif field == "vg":
1622
            val = vol['vg']
1623
          elif field == "name":
1624
            val = vol['name']
1625
          elif field == "size":
1626
            val = int(float(vol['size']))
1627
          elif field == "instance":
1628
            for inst in ilist:
1629
              if node not in lv_by_node[inst]:
1630
                continue
1631
              if vol['name'] in lv_by_node[inst][node]:
1632
                val = inst.name
1633
                break
1634
            else:
1635
              val = '-'
1636
          else:
1637
            raise errors.ParameterError(field)
1638
          node_output.append(str(val))
1639

    
1640
        output.append(node_output)
1641

    
1642
    return output
1643

    
1644

    
1645
class LUAddNode(LogicalUnit):
1646
  """Logical unit for adding node to the cluster.
1647

1648
  """
1649
  HPATH = "node-add"
1650
  HTYPE = constants.HTYPE_NODE
1651
  _OP_REQP = ["node_name"]
1652

    
1653
  def BuildHooksEnv(self):
1654
    """Build hooks env.
1655

1656
    This will run on all nodes before, and on all nodes + the new node after.
1657

1658
    """
1659
    env = {
1660
      "OP_TARGET": self.op.node_name,
1661
      "NODE_NAME": self.op.node_name,
1662
      "NODE_PIP": self.op.primary_ip,
1663
      "NODE_SIP": self.op.secondary_ip,
1664
      }
1665
    nodes_0 = self.cfg.GetNodeList()
1666
    nodes_1 = nodes_0 + [self.op.node_name, ]
1667
    return env, nodes_0, nodes_1
1668

    
1669
  def CheckPrereq(self):
1670
    """Check prerequisites.
1671

1672
    This checks:
1673
     - the new node is not already in the config
1674
     - it is resolvable
1675
     - its parameters (single/dual homed) matches the cluster
1676

1677
    Any errors are signalled by raising errors.OpPrereqError.
1678

1679
    """
1680
    node_name = self.op.node_name
1681
    cfg = self.cfg
1682

    
1683
    dns_data = utils.HostInfo(node_name)
1684

    
1685
    node = dns_data.name
1686
    primary_ip = self.op.primary_ip = dns_data.ip
1687
    secondary_ip = getattr(self.op, "secondary_ip", None)
1688
    if secondary_ip is None:
1689
      secondary_ip = primary_ip
1690
    if not utils.IsValidIP(secondary_ip):
1691
      raise errors.OpPrereqError("Invalid secondary IP given")
1692
    self.op.secondary_ip = secondary_ip
1693

    
1694
    node_list = cfg.GetNodeList()
1695
    if not self.op.readd and node in node_list:
1696
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1697
                                 node)
1698
    elif self.op.readd and node not in node_list:
1699
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1700

    
1701
    for existing_node_name in node_list:
1702
      existing_node = cfg.GetNodeInfo(existing_node_name)
1703

    
1704
      if self.op.readd and node == existing_node_name:
1705
        if (existing_node.primary_ip != primary_ip or
1706
            existing_node.secondary_ip != secondary_ip):
1707
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1708
                                     " address configuration as before")
1709
        continue
1710

    
1711
      if (existing_node.primary_ip == primary_ip or
1712
          existing_node.secondary_ip == primary_ip or
1713
          existing_node.primary_ip == secondary_ip or
1714
          existing_node.secondary_ip == secondary_ip):
1715
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1716
                                   " existing node %s" % existing_node.name)
1717

    
1718
    # check that the type of the node (single versus dual homed) is the
1719
    # same as for the master
1720
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1721
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1722
    newbie_singlehomed = secondary_ip == primary_ip
1723
    if master_singlehomed != newbie_singlehomed:
1724
      if master_singlehomed:
1725
        raise errors.OpPrereqError("The master has no private ip but the"
1726
                                   " new node has one")
1727
      else:
1728
        raise errors.OpPrereqError("The master has a private ip but the"
1729
                                   " new node doesn't have one")
1730

    
1731
    # checks reachablity
1732
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1733
      raise errors.OpPrereqError("Node not reachable by ping")
1734

    
1735
    if not newbie_singlehomed:
1736
      # check reachability from my secondary ip to newbie's secondary ip
1737
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1738
                           source=myself.secondary_ip):
1739
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1740
                                   " based ping to noded port")
1741

    
1742
    self.new_node = objects.Node(name=node,
1743
                                 primary_ip=primary_ip,
1744
                                 secondary_ip=secondary_ip)
1745

    
1746
  def Exec(self, feedback_fn):
1747
    """Adds the new node to the cluster.
1748

1749
    """
1750
    new_node = self.new_node
1751
    node = new_node.name
1752

    
1753
    # check connectivity
1754
    result = self.rpc.call_version([node])[node]
1755
    if result:
1756
      if constants.PROTOCOL_VERSION == result:
1757
        logger.Info("communication to node %s fine, sw version %s match" %
1758
                    (node, result))
1759
      else:
1760
        raise errors.OpExecError("Version mismatch master version %s,"
1761
                                 " node version %s" %
1762
                                 (constants.PROTOCOL_VERSION, result))
1763
    else:
1764
      raise errors.OpExecError("Cannot get version from the new node")
1765

    
1766
    # setup ssh on node
1767
    logger.Info("copy ssh key to node %s" % node)
1768
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1769
    keyarray = []
1770
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1771
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1772
                priv_key, pub_key]
1773

    
1774
    for i in keyfiles:
1775
      f = open(i, 'r')
1776
      try:
1777
        keyarray.append(f.read())
1778
      finally:
1779
        f.close()
1780

    
1781
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1782
                                    keyarray[2],
1783
                                    keyarray[3], keyarray[4], keyarray[5])
1784

    
1785
    if not result:
1786
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1787

    
1788
    # Add node to our /etc/hosts, and add key to known_hosts
1789
    utils.AddHostToEtcHosts(new_node.name)
1790

    
1791
    if new_node.secondary_ip != new_node.primary_ip:
1792
      if not self.rpc.call_node_has_ip_address(new_node.name,
1793
                                               new_node.secondary_ip):
1794
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1795
                                 " you gave (%s). Please fix and re-run this"
1796
                                 " command." % new_node.secondary_ip)
1797

    
1798
    node_verify_list = [self.cfg.GetMasterNode()]
1799
    node_verify_param = {
1800
      'nodelist': [node],
1801
      # TODO: do a node-net-test as well?
1802
    }
1803

    
1804
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1805
                                       self.cfg.GetClusterName())
1806
    for verifier in node_verify_list:
1807
      if not result[verifier]:
1808
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1809
                                 " for remote verification" % verifier)
1810
      if result[verifier]['nodelist']:
1811
        for failed in result[verifier]['nodelist']:
1812
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1813
                      (verifier, result[verifier]['nodelist'][failed]))
1814
        raise errors.OpExecError("ssh/hostname verification failed.")
1815

    
1816
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1817
    # including the node just added
1818
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1819
    dist_nodes = self.cfg.GetNodeList()
1820
    if not self.op.readd:
1821
      dist_nodes.append(node)
1822
    if myself.name in dist_nodes:
1823
      dist_nodes.remove(myself.name)
1824

    
1825
    logger.Debug("Copying hosts and known_hosts to all nodes")
1826
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1827
      result = self.rpc.call_upload_file(dist_nodes, fname)
1828
      for to_node in dist_nodes:
1829
        if not result[to_node]:
1830
          logger.Error("copy of file %s to node %s failed" %
1831
                       (fname, to_node))
1832

    
1833
    to_copy = []
1834
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1835
      to_copy.append(constants.VNC_PASSWORD_FILE)
1836
    for fname in to_copy:
1837
      result = self.rpc.call_upload_file([node], fname)
1838
      if not result[node]:
1839
        logger.Error("could not copy file %s to node %s" % (fname, node))
1840

    
1841
    if self.op.readd:
1842
      self.context.ReaddNode(new_node)
1843
    else:
1844
      self.context.AddNode(new_node)
1845

    
1846

    
1847
class LUQueryClusterInfo(NoHooksLU):
1848
  """Query cluster configuration.
1849

1850
  """
1851
  _OP_REQP = []
1852
  REQ_MASTER = False
1853
  REQ_BGL = False
1854

    
1855
  def ExpandNames(self):
1856
    self.needed_locks = {}
1857

    
1858
  def CheckPrereq(self):
1859
    """No prerequsites needed for this LU.
1860

1861
    """
1862
    pass
1863

    
1864
  def Exec(self, feedback_fn):
1865
    """Return cluster config.
1866

1867
    """
1868
    result = {
1869
      "name": self.cfg.GetClusterName(),
1870
      "software_version": constants.RELEASE_VERSION,
1871
      "protocol_version": constants.PROTOCOL_VERSION,
1872
      "config_version": constants.CONFIG_VERSION,
1873
      "os_api_version": constants.OS_API_VERSION,
1874
      "export_version": constants.EXPORT_VERSION,
1875
      "master": self.cfg.GetMasterNode(),
1876
      "architecture": (platform.architecture()[0], platform.machine()),
1877
      "hypervisor_type": self.cfg.GetHypervisorType(),
1878
      "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1879
      }
1880

    
1881
    return result
1882

    
1883

    
1884
class LUQueryConfigValues(NoHooksLU):
1885
  """Return configuration values.
1886

1887
  """
1888
  _OP_REQP = []
1889
  REQ_BGL = False
1890

    
1891
  def ExpandNames(self):
1892
    self.needed_locks = {}
1893

    
1894
    static_fields = ["cluster_name", "master_node"]
1895
    _CheckOutputFields(static=static_fields,
1896
                       dynamic=[],
1897
                       selected=self.op.output_fields)
1898

    
1899
  def CheckPrereq(self):
1900
    """No prerequisites.
1901

1902
    """
1903
    pass
1904

    
1905
  def Exec(self, feedback_fn):
1906
    """Dump a representation of the cluster config to the standard output.
1907

1908
    """
1909
    values = []
1910
    for field in self.op.output_fields:
1911
      if field == "cluster_name":
1912
        values.append(self.cfg.GetClusterName())
1913
      elif field == "master_node":
1914
        values.append(self.cfg.GetMasterNode())
1915
      else:
1916
        raise errors.ParameterError(field)
1917
    return values
1918

    
1919

    
1920
class LUActivateInstanceDisks(NoHooksLU):
1921
  """Bring up an instance's disks.
1922

1923
  """
1924
  _OP_REQP = ["instance_name"]
1925
  REQ_BGL = False
1926

    
1927
  def ExpandNames(self):
1928
    self._ExpandAndLockInstance()
1929
    self.needed_locks[locking.LEVEL_NODE] = []
1930
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1931

    
1932
  def DeclareLocks(self, level):
1933
    if level == locking.LEVEL_NODE:
1934
      self._LockInstancesNodes()
1935

    
1936
  def CheckPrereq(self):
1937
    """Check prerequisites.
1938

1939
    This checks that the instance is in the cluster.
1940

1941
    """
1942
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1943
    assert self.instance is not None, \
1944
      "Cannot retrieve locked instance %s" % self.op.instance_name
1945

    
1946
  def Exec(self, feedback_fn):
1947
    """Activate the disks.
1948

1949
    """
1950
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1951
    if not disks_ok:
1952
      raise errors.OpExecError("Cannot activate block devices")
1953

    
1954
    return disks_info
1955

    
1956

    
1957
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1958
  """Prepare the block devices for an instance.
1959

1960
  This sets up the block devices on all nodes.
1961

1962
  Args:
1963
    instance: a ganeti.objects.Instance object
1964
    ignore_secondaries: if true, errors on secondary nodes won't result
1965
                        in an error return from the function
1966

1967
  Returns:
1968
    false if the operation failed
1969
    list of (host, instance_visible_name, node_visible_name) if the operation
1970
         suceeded with the mapping from node devices to instance devices
1971
  """
1972
  device_info = []
1973
  disks_ok = True
1974
  iname = instance.name
1975
  # With the two passes mechanism we try to reduce the window of
1976
  # opportunity for the race condition of switching DRBD to primary
1977
  # before handshaking occured, but we do not eliminate it
1978

    
1979
  # The proper fix would be to wait (with some limits) until the
1980
  # connection has been made and drbd transitions from WFConnection
1981
  # into any other network-connected state (Connected, SyncTarget,
1982
  # SyncSource, etc.)
1983

    
1984
  # 1st pass, assemble on all nodes in secondary mode
1985
  for inst_disk in instance.disks:
1986
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1987
      lu.cfg.SetDiskID(node_disk, node)
1988
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1989
      if not result:
1990
        logger.Error("could not prepare block device %s on node %s"
1991
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1992
        if not ignore_secondaries:
1993
          disks_ok = False
1994

    
1995
  # FIXME: race condition on drbd migration to primary
1996

    
1997
  # 2nd pass, do only the primary node
1998
  for inst_disk in instance.disks:
1999
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2000
      if node != instance.primary_node:
2001
        continue
2002
      lu.cfg.SetDiskID(node_disk, node)
2003
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2004
      if not result:
2005
        logger.Error("could not prepare block device %s on node %s"
2006
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2007
        disks_ok = False
2008
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2009

    
2010
  # leave the disks configured for the primary node
2011
  # this is a workaround that would be fixed better by
2012
  # improving the logical/physical id handling
2013
  for disk in instance.disks:
2014
    lu.cfg.SetDiskID(disk, instance.primary_node)
2015

    
2016
  return disks_ok, device_info
2017

    
2018

    
2019
def _StartInstanceDisks(lu, instance, force):
2020
  """Start the disks of an instance.
2021

2022
  """
2023
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2024
                                           ignore_secondaries=force)
2025
  if not disks_ok:
2026
    _ShutdownInstanceDisks(lu, instance)
2027
    if force is not None and not force:
2028
      logger.Error("If the message above refers to a secondary node,"
2029
                   " you can retry the operation using '--force'.")
2030
    raise errors.OpExecError("Disk consistency error")
2031

    
2032

    
2033
class LUDeactivateInstanceDisks(NoHooksLU):
2034
  """Shutdown an instance's disks.
2035

2036
  """
2037
  _OP_REQP = ["instance_name"]
2038
  REQ_BGL = False
2039

    
2040
  def ExpandNames(self):
2041
    self._ExpandAndLockInstance()
2042
    self.needed_locks[locking.LEVEL_NODE] = []
2043
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2044

    
2045
  def DeclareLocks(self, level):
2046
    if level == locking.LEVEL_NODE:
2047
      self._LockInstancesNodes()
2048

    
2049
  def CheckPrereq(self):
2050
    """Check prerequisites.
2051

2052
    This checks that the instance is in the cluster.
2053

2054
    """
2055
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2056
    assert self.instance is not None, \
2057
      "Cannot retrieve locked instance %s" % self.op.instance_name
2058

    
2059
  def Exec(self, feedback_fn):
2060
    """Deactivate the disks
2061

2062
    """
2063
    instance = self.instance
2064
    _SafeShutdownInstanceDisks(self, instance)
2065

    
2066

    
2067
def _SafeShutdownInstanceDisks(lu, instance):
2068
  """Shutdown block devices of an instance.
2069

2070
  This function checks if an instance is running, before calling
2071
  _ShutdownInstanceDisks.
2072

2073
  """
2074
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2075
                                      [instance.hypervisor])
2076
  ins_l = ins_l[instance.primary_node]
2077
  if not type(ins_l) is list:
2078
    raise errors.OpExecError("Can't contact node '%s'" %
2079
                             instance.primary_node)
2080

    
2081
  if instance.name in ins_l:
2082
    raise errors.OpExecError("Instance is running, can't shutdown"
2083
                             " block devices.")
2084

    
2085
  _ShutdownInstanceDisks(lu, instance)
2086

    
2087

    
2088
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2089
  """Shutdown block devices of an instance.
2090

2091
  This does the shutdown on all nodes of the instance.
2092

2093
  If the ignore_primary is false, errors on the primary node are
2094
  ignored.
2095

2096
  """
2097
  result = True
2098
  for disk in instance.disks:
2099
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2100
      lu.cfg.SetDiskID(top_disk, node)
2101
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2102
        logger.Error("could not shutdown block device %s on node %s" %
2103
                     (disk.iv_name, node))
2104
        if not ignore_primary or node != instance.primary_node:
2105
          result = False
2106
  return result
2107

    
2108

    
2109
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2110
  """Checks if a node has enough free memory.
2111

2112
  This function check if a given node has the needed amount of free
2113
  memory. In case the node has less memory or we cannot get the
2114
  information from the node, this function raise an OpPrereqError
2115
  exception.
2116

2117
  @type lu: C{LogicalUnit}
2118
  @param lu: a logical unit from which we get configuration data
2119
  @type node: C{str}
2120
  @param node: the node to check
2121
  @type reason: C{str}
2122
  @param reason: string to use in the error message
2123
  @type requested: C{int}
2124
  @param requested: the amount of memory in MiB to check for
2125
  @type hypervisor: C{str}
2126
  @param hypervisor: the hypervisor to ask for memory stats
2127
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2128
      we cannot check the node
2129

2130
  """
2131
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2132
  if not nodeinfo or not isinstance(nodeinfo, dict):
2133
    raise errors.OpPrereqError("Could not contact node %s for resource"
2134
                             " information" % (node,))
2135

    
2136
  free_mem = nodeinfo[node].get('memory_free')
2137
  if not isinstance(free_mem, int):
2138
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2139
                             " was '%s'" % (node, free_mem))
2140
  if requested > free_mem:
2141
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2142
                             " needed %s MiB, available %s MiB" %
2143
                             (node, reason, requested, free_mem))
2144

    
2145

    
2146
class LUStartupInstance(LogicalUnit):
2147
  """Starts an instance.
2148

2149
  """
2150
  HPATH = "instance-start"
2151
  HTYPE = constants.HTYPE_INSTANCE
2152
  _OP_REQP = ["instance_name", "force"]
2153
  REQ_BGL = False
2154

    
2155
  def ExpandNames(self):
2156
    self._ExpandAndLockInstance()
2157
    self.needed_locks[locking.LEVEL_NODE] = []
2158
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2159

    
2160
  def DeclareLocks(self, level):
2161
    if level == locking.LEVEL_NODE:
2162
      self._LockInstancesNodes()
2163

    
2164
  def BuildHooksEnv(self):
2165
    """Build hooks env.
2166

2167
    This runs on master, primary and secondary nodes of the instance.
2168

2169
    """
2170
    env = {
2171
      "FORCE": self.op.force,
2172
      }
2173
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2174
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2175
          list(self.instance.secondary_nodes))
2176
    return env, nl, nl
2177

    
2178
  def CheckPrereq(self):
2179
    """Check prerequisites.
2180

2181
    This checks that the instance is in the cluster.
2182

2183
    """
2184
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2185
    assert self.instance is not None, \
2186
      "Cannot retrieve locked instance %s" % self.op.instance_name
2187

    
2188
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2189
    # check bridges existance
2190
    _CheckInstanceBridgesExist(self, instance)
2191

    
2192
    _CheckNodeFreeMemory(self, instance.primary_node,
2193
                         "starting instance %s" % instance.name,
2194
                         bep[constants.BE_MEMORY], instance.hypervisor)
2195

    
2196
  def Exec(self, feedback_fn):
2197
    """Start the instance.
2198

2199
    """
2200
    instance = self.instance
2201
    force = self.op.force
2202
    extra_args = getattr(self.op, "extra_args", "")
2203

    
2204
    self.cfg.MarkInstanceUp(instance.name)
2205

    
2206
    node_current = instance.primary_node
2207

    
2208
    _StartInstanceDisks(self, instance, force)
2209

    
2210
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2211
      _ShutdownInstanceDisks(self, instance)
2212
      raise errors.OpExecError("Could not start instance")
2213

    
2214

    
2215
class LURebootInstance(LogicalUnit):
2216
  """Reboot an instance.
2217

2218
  """
2219
  HPATH = "instance-reboot"
2220
  HTYPE = constants.HTYPE_INSTANCE
2221
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2222
  REQ_BGL = False
2223

    
2224
  def ExpandNames(self):
2225
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2226
                                   constants.INSTANCE_REBOOT_HARD,
2227
                                   constants.INSTANCE_REBOOT_FULL]:
2228
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2229
                                  (constants.INSTANCE_REBOOT_SOFT,
2230
                                   constants.INSTANCE_REBOOT_HARD,
2231
                                   constants.INSTANCE_REBOOT_FULL))
2232
    self._ExpandAndLockInstance()
2233
    self.needed_locks[locking.LEVEL_NODE] = []
2234
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2235

    
2236
  def DeclareLocks(self, level):
2237
    if level == locking.LEVEL_NODE:
2238
      primary_only = not constants.INSTANCE_REBOOT_FULL
2239
      self._LockInstancesNodes(primary_only=primary_only)
2240

    
2241
  def BuildHooksEnv(self):
2242
    """Build hooks env.
2243

2244
    This runs on master, primary and secondary nodes of the instance.
2245

2246
    """
2247
    env = {
2248
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2249
      }
2250
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2251
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2252
          list(self.instance.secondary_nodes))
2253
    return env, nl, nl
2254

    
2255
  def CheckPrereq(self):
2256
    """Check prerequisites.
2257

2258
    This checks that the instance is in the cluster.
2259

2260
    """
2261
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2262
    assert self.instance is not None, \
2263
      "Cannot retrieve locked instance %s" % self.op.instance_name
2264

    
2265
    # check bridges existance
2266
    _CheckInstanceBridgesExist(self, instance)
2267

    
2268
  def Exec(self, feedback_fn):
2269
    """Reboot the instance.
2270

2271
    """
2272
    instance = self.instance
2273
    ignore_secondaries = self.op.ignore_secondaries
2274
    reboot_type = self.op.reboot_type
2275
    extra_args = getattr(self.op, "extra_args", "")
2276

    
2277
    node_current = instance.primary_node
2278

    
2279
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2280
                       constants.INSTANCE_REBOOT_HARD]:
2281
      if not self.rpc.call_instance_reboot(node_current, instance,
2282
                                           reboot_type, extra_args):
2283
        raise errors.OpExecError("Could not reboot instance")
2284
    else:
2285
      if not self.rpc.call_instance_shutdown(node_current, instance):
2286
        raise errors.OpExecError("could not shutdown instance for full reboot")
2287
      _ShutdownInstanceDisks(self, instance)
2288
      _StartInstanceDisks(self, instance, ignore_secondaries)
2289
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2290
        _ShutdownInstanceDisks(self, instance)
2291
        raise errors.OpExecError("Could not start instance for full reboot")
2292

    
2293
    self.cfg.MarkInstanceUp(instance.name)
2294

    
2295

    
2296
class LUShutdownInstance(LogicalUnit):
2297
  """Shutdown an instance.
2298

2299
  """
2300
  HPATH = "instance-stop"
2301
  HTYPE = constants.HTYPE_INSTANCE
2302
  _OP_REQP = ["instance_name"]
2303
  REQ_BGL = False
2304

    
2305
  def ExpandNames(self):
2306
    self._ExpandAndLockInstance()
2307
    self.needed_locks[locking.LEVEL_NODE] = []
2308
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2309

    
2310
  def DeclareLocks(self, level):
2311
    if level == locking.LEVEL_NODE:
2312
      self._LockInstancesNodes()
2313

    
2314
  def BuildHooksEnv(self):
2315
    """Build hooks env.
2316

2317
    This runs on master, primary and secondary nodes of the instance.
2318

2319
    """
2320
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2321
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2322
          list(self.instance.secondary_nodes))
2323
    return env, nl, nl
2324

    
2325
  def CheckPrereq(self):
2326
    """Check prerequisites.
2327

2328
    This checks that the instance is in the cluster.
2329

2330
    """
2331
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2332
    assert self.instance is not None, \
2333
      "Cannot retrieve locked instance %s" % self.op.instance_name
2334

    
2335
  def Exec(self, feedback_fn):
2336
    """Shutdown the instance.
2337

2338
    """
2339
    instance = self.instance
2340
    node_current = instance.primary_node
2341
    self.cfg.MarkInstanceDown(instance.name)
2342
    if not self.rpc.call_instance_shutdown(node_current, instance):
2343
      logger.Error("could not shutdown instance")
2344

    
2345
    _ShutdownInstanceDisks(self, instance)
2346

    
2347

    
2348
class LUReinstallInstance(LogicalUnit):
2349
  """Reinstall an instance.
2350

2351
  """
2352
  HPATH = "instance-reinstall"
2353
  HTYPE = constants.HTYPE_INSTANCE
2354
  _OP_REQP = ["instance_name"]
2355
  REQ_BGL = False
2356

    
2357
  def ExpandNames(self):
2358
    self._ExpandAndLockInstance()
2359
    self.needed_locks[locking.LEVEL_NODE] = []
2360
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2361

    
2362
  def DeclareLocks(self, level):
2363
    if level == locking.LEVEL_NODE:
2364
      self._LockInstancesNodes()
2365

    
2366
  def BuildHooksEnv(self):
2367
    """Build hooks env.
2368

2369
    This runs on master, primary and secondary nodes of the instance.
2370

2371
    """
2372
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2373
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2374
          list(self.instance.secondary_nodes))
2375
    return env, nl, nl
2376

    
2377
  def CheckPrereq(self):
2378
    """Check prerequisites.
2379

2380
    This checks that the instance is in the cluster and is not running.
2381

2382
    """
2383
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2384
    assert instance is not None, \
2385
      "Cannot retrieve locked instance %s" % self.op.instance_name
2386

    
2387
    if instance.disk_template == constants.DT_DISKLESS:
2388
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2389
                                 self.op.instance_name)
2390
    if instance.status != "down":
2391
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2392
                                 self.op.instance_name)
2393
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2394
                                              instance.name,
2395
                                              instance.hypervisor)
2396
    if remote_info:
2397
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2398
                                 (self.op.instance_name,
2399
                                  instance.primary_node))
2400

    
2401
    self.op.os_type = getattr(self.op, "os_type", None)
2402
    if self.op.os_type is not None:
2403
      # OS verification
2404
      pnode = self.cfg.GetNodeInfo(
2405
        self.cfg.ExpandNodeName(instance.primary_node))
2406
      if pnode is None:
2407
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2408
                                   self.op.pnode)
2409
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2410
      if not os_obj:
2411
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2412
                                   " primary node"  % self.op.os_type)
2413

    
2414
    self.instance = instance
2415

    
2416
  def Exec(self, feedback_fn):
2417
    """Reinstall the instance.
2418

2419
    """
2420
    inst = self.instance
2421

    
2422
    if self.op.os_type is not None:
2423
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2424
      inst.os = self.op.os_type
2425
      self.cfg.Update(inst)
2426

    
2427
    _StartInstanceDisks(self, inst, None)
2428
    try:
2429
      feedback_fn("Running the instance OS create scripts...")
2430
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2431
                                           "sda", "sdb"):
2432
        raise errors.OpExecError("Could not install OS for instance %s"
2433
                                 " on node %s" %
2434
                                 (inst.name, inst.primary_node))
2435
    finally:
2436
      _ShutdownInstanceDisks(self, inst)
2437

    
2438

    
2439
class LURenameInstance(LogicalUnit):
2440
  """Rename an instance.
2441

2442
  """
2443
  HPATH = "instance-rename"
2444
  HTYPE = constants.HTYPE_INSTANCE
2445
  _OP_REQP = ["instance_name", "new_name"]
2446

    
2447
  def BuildHooksEnv(self):
2448
    """Build hooks env.
2449

2450
    This runs on master, primary and secondary nodes of the instance.
2451

2452
    """
2453
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2454
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2455
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2456
          list(self.instance.secondary_nodes))
2457
    return env, nl, nl
2458

    
2459
  def CheckPrereq(self):
2460
    """Check prerequisites.
2461

2462
    This checks that the instance is in the cluster and is not running.
2463

2464
    """
2465
    instance = self.cfg.GetInstanceInfo(
2466
      self.cfg.ExpandInstanceName(self.op.instance_name))
2467
    if instance is None:
2468
      raise errors.OpPrereqError("Instance '%s' not known" %
2469
                                 self.op.instance_name)
2470
    if instance.status != "down":
2471
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2472
                                 self.op.instance_name)
2473
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2474
                                              instance.name,
2475
                                              instance.hypervisor)
2476
    if remote_info:
2477
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2478
                                 (self.op.instance_name,
2479
                                  instance.primary_node))
2480
    self.instance = instance
2481

    
2482
    # new name verification
2483
    name_info = utils.HostInfo(self.op.new_name)
2484

    
2485
    self.op.new_name = new_name = name_info.name
2486
    instance_list = self.cfg.GetInstanceList()
2487
    if new_name in instance_list:
2488
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2489
                                 new_name)
2490

    
2491
    if not getattr(self.op, "ignore_ip", False):
2492
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2493
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2494
                                   (name_info.ip, new_name))
2495

    
2496

    
2497
  def Exec(self, feedback_fn):
2498
    """Reinstall the instance.
2499

2500
    """
2501
    inst = self.instance
2502
    old_name = inst.name
2503

    
2504
    if inst.disk_template == constants.DT_FILE:
2505
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2506

    
2507
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2508
    # Change the instance lock. This is definitely safe while we hold the BGL
2509
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2510
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2511

    
2512
    # re-read the instance from the configuration after rename
2513
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2514

    
2515
    if inst.disk_template == constants.DT_FILE:
2516
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2517
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2518
                                                     old_file_storage_dir,
2519
                                                     new_file_storage_dir)
2520

    
2521
      if not result:
2522
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2523
                                 " directory '%s' to '%s' (but the instance"
2524
                                 " has been renamed in Ganeti)" % (
2525
                                 inst.primary_node, old_file_storage_dir,
2526
                                 new_file_storage_dir))
2527

    
2528
      if not result[0]:
2529
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2530
                                 " (but the instance has been renamed in"
2531
                                 " Ganeti)" % (old_file_storage_dir,
2532
                                               new_file_storage_dir))
2533

    
2534
    _StartInstanceDisks(self, inst, None)
2535
    try:
2536
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2537
                                               old_name,
2538
                                               "sda", "sdb"):
2539
        msg = ("Could not run OS rename script for instance %s on node %s"
2540
               " (but the instance has been renamed in Ganeti)" %
2541
               (inst.name, inst.primary_node))
2542
        logger.Error(msg)
2543
    finally:
2544
      _ShutdownInstanceDisks(self, inst)
2545

    
2546

    
2547
class LURemoveInstance(LogicalUnit):
2548
  """Remove an instance.
2549

2550
  """
2551
  HPATH = "instance-remove"
2552
  HTYPE = constants.HTYPE_INSTANCE
2553
  _OP_REQP = ["instance_name", "ignore_failures"]
2554
  REQ_BGL = False
2555

    
2556
  def ExpandNames(self):
2557
    self._ExpandAndLockInstance()
2558
    self.needed_locks[locking.LEVEL_NODE] = []
2559
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2560

    
2561
  def DeclareLocks(self, level):
2562
    if level == locking.LEVEL_NODE:
2563
      self._LockInstancesNodes()
2564

    
2565
  def BuildHooksEnv(self):
2566
    """Build hooks env.
2567

2568
    This runs on master, primary and secondary nodes of the instance.
2569

2570
    """
2571
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2572
    nl = [self.cfg.GetMasterNode()]
2573
    return env, nl, nl
2574

    
2575
  def CheckPrereq(self):
2576
    """Check prerequisites.
2577

2578
    This checks that the instance is in the cluster.
2579

2580
    """
2581
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2582
    assert self.instance is not None, \
2583
      "Cannot retrieve locked instance %s" % self.op.instance_name
2584

    
2585
  def Exec(self, feedback_fn):
2586
    """Remove the instance.
2587

2588
    """
2589
    instance = self.instance
2590
    logger.Info("shutting down instance %s on node %s" %
2591
                (instance.name, instance.primary_node))
2592

    
2593
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2594
      if self.op.ignore_failures:
2595
        feedback_fn("Warning: can't shutdown instance")
2596
      else:
2597
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2598
                                 (instance.name, instance.primary_node))
2599

    
2600
    logger.Info("removing block devices for instance %s" % instance.name)
2601

    
2602
    if not _RemoveDisks(self, instance):
2603
      if self.op.ignore_failures:
2604
        feedback_fn("Warning: can't remove instance's disks")
2605
      else:
2606
        raise errors.OpExecError("Can't remove instance's disks")
2607

    
2608
    logger.Info("removing instance %s out of cluster config" % instance.name)
2609

    
2610
    self.cfg.RemoveInstance(instance.name)
2611
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2612

    
2613

    
2614
class LUQueryInstances(NoHooksLU):
2615
  """Logical unit for querying instances.
2616

2617
  """
2618
  _OP_REQP = ["output_fields", "names"]
2619
  REQ_BGL = False
2620

    
2621
  def ExpandNames(self):
2622
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2623
    hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2624
    bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2625
    self.static_fields = frozenset([
2626
      "name", "os", "pnode", "snodes",
2627
      "admin_state", "admin_ram",
2628
      "disk_template", "ip", "mac", "bridge",
2629
      "sda_size", "sdb_size", "vcpus", "tags",
2630
      "network_port",
2631
      "serial_no", "hypervisor", "hvparams",
2632
      ] + hvp + bep)
2633

    
2634
    _CheckOutputFields(static=self.static_fields,
2635
                       dynamic=self.dynamic_fields,
2636
                       selected=self.op.output_fields)
2637

    
2638
    self.needed_locks = {}
2639
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2640
    self.share_locks[locking.LEVEL_NODE] = 1
2641

    
2642
    if self.op.names:
2643
      self.wanted = _GetWantedInstances(self, self.op.names)
2644
    else:
2645
      self.wanted = locking.ALL_SET
2646

    
2647
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2648
    if self.do_locking:
2649
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2650
      self.needed_locks[locking.LEVEL_NODE] = []
2651
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2652

    
2653
  def DeclareLocks(self, level):
2654
    if level == locking.LEVEL_NODE and self.do_locking:
2655
      self._LockInstancesNodes()
2656

    
2657
  def CheckPrereq(self):
2658
    """Check prerequisites.
2659

2660
    """
2661
    pass
2662

    
2663
  def Exec(self, feedback_fn):
2664
    """Computes the list of nodes and their attributes.
2665

2666
    """
2667
    all_info = self.cfg.GetAllInstancesInfo()
2668
    if self.do_locking:
2669
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2670
    elif self.wanted != locking.ALL_SET:
2671
      instance_names = self.wanted
2672
      missing = set(instance_names).difference(all_info.keys())
2673
      if missing:
2674
        raise errors.OpExecError(
2675
          "Some instances were removed before retrieving their data: %s"
2676
          % missing)
2677
    else:
2678
      instance_names = all_info.keys()
2679

    
2680
    instance_names = utils.NiceSort(instance_names)
2681
    instance_list = [all_info[iname] for iname in instance_names]
2682

    
2683
    # begin data gathering
2684

    
2685
    nodes = frozenset([inst.primary_node for inst in instance_list])
2686
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2687

    
2688
    bad_nodes = []
2689
    if self.dynamic_fields.intersection(self.op.output_fields):
2690
      live_data = {}
2691
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2692
      for name in nodes:
2693
        result = node_data[name]
2694
        if result:
2695
          live_data.update(result)
2696
        elif result == False:
2697
          bad_nodes.append(name)
2698
        # else no instance is alive
2699
    else:
2700
      live_data = dict([(name, {}) for name in instance_names])
2701

    
2702
    # end data gathering
2703

    
2704
    HVPREFIX = "hv/"
2705
    BEPREFIX = "be/"
2706
    output = []
2707
    for instance in instance_list:
2708
      iout = []
2709
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2710
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2711
      for field in self.op.output_fields:
2712
        if field == "name":
2713
          val = instance.name
2714
        elif field == "os":
2715
          val = instance.os
2716
        elif field == "pnode":
2717
          val = instance.primary_node
2718
        elif field == "snodes":
2719
          val = list(instance.secondary_nodes)
2720
        elif field == "admin_state":
2721
          val = (instance.status != "down")
2722
        elif field == "oper_state":
2723
          if instance.primary_node in bad_nodes:
2724
            val = None
2725
          else:
2726
            val = bool(live_data.get(instance.name))
2727
        elif field == "status":
2728
          if instance.primary_node in bad_nodes:
2729
            val = "ERROR_nodedown"
2730
          else:
2731
            running = bool(live_data.get(instance.name))
2732
            if running:
2733
              if instance.status != "down":
2734
                val = "running"
2735
              else:
2736
                val = "ERROR_up"
2737
            else:
2738
              if instance.status != "down":
2739
                val = "ERROR_down"
2740
              else:
2741
                val = "ADMIN_down"
2742
        elif field == "oper_ram":
2743
          if instance.primary_node in bad_nodes:
2744
            val = None
2745
          elif instance.name in live_data:
2746
            val = live_data[instance.name].get("memory", "?")
2747
          else:
2748
            val = "-"
2749
        elif field == "disk_template":
2750
          val = instance.disk_template
2751
        elif field == "ip":
2752
          val = instance.nics[0].ip
2753
        elif field == "bridge":
2754
          val = instance.nics[0].bridge
2755
        elif field == "mac":
2756
          val = instance.nics[0].mac
2757
        elif field == "sda_size" or field == "sdb_size":
2758
          disk = instance.FindDisk(field[:3])
2759
          if disk is None:
2760
            val = None
2761
          else:
2762
            val = disk.size
2763
        elif field == "tags":
2764
          val = list(instance.GetTags())
2765
        elif field == "serial_no":
2766
          val = instance.serial_no
2767
        elif field == "network_port":
2768
          val = instance.network_port
2769
        elif field == "hypervisor":
2770
          val = instance.hypervisor
2771
        elif field == "hvparams":
2772
          val = i_hv
2773
        elif (field.startswith(HVPREFIX) and
2774
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2775
          val = i_hv.get(field[len(HVPREFIX):], None)
2776
        elif field == "beparams":
2777
          val = i_be
2778
        elif (field.startswith(BEPREFIX) and
2779
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2780
          val = i_be.get(field[len(BEPREFIX):], None)
2781
        else:
2782
          raise errors.ParameterError(field)
2783
        iout.append(val)
2784
      output.append(iout)
2785

    
2786
    return output
2787

    
2788

    
2789
class LUFailoverInstance(LogicalUnit):
2790
  """Failover an instance.
2791

2792
  """
2793
  HPATH = "instance-failover"
2794
  HTYPE = constants.HTYPE_INSTANCE
2795
  _OP_REQP = ["instance_name", "ignore_consistency"]
2796
  REQ_BGL = False
2797

    
2798
  def ExpandNames(self):
2799
    self._ExpandAndLockInstance()
2800
    self.needed_locks[locking.LEVEL_NODE] = []
2801
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2802

    
2803
  def DeclareLocks(self, level):
2804
    if level == locking.LEVEL_NODE:
2805
      self._LockInstancesNodes()
2806

    
2807
  def BuildHooksEnv(self):
2808
    """Build hooks env.
2809

2810
    This runs on master, primary and secondary nodes of the instance.
2811

2812
    """
2813
    env = {
2814
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2815
      }
2816
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2817
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2818
    return env, nl, nl
2819

    
2820
  def CheckPrereq(self):
2821
    """Check prerequisites.
2822

2823
    This checks that the instance is in the cluster.
2824

2825
    """
2826
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2827
    assert self.instance is not None, \
2828
      "Cannot retrieve locked instance %s" % self.op.instance_name
2829

    
2830
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2831
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2832
      raise errors.OpPrereqError("Instance's disk layout is not"
2833
                                 " network mirrored, cannot failover.")
2834

    
2835
    secondary_nodes = instance.secondary_nodes
2836
    if not secondary_nodes:
2837
      raise errors.ProgrammerError("no secondary node but using "
2838
                                   "a mirrored disk template")
2839

    
2840
    target_node = secondary_nodes[0]
2841
    # check memory requirements on the secondary node
2842
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2843
                         instance.name, bep[constants.BE_MEMORY],
2844
                         instance.hypervisor)
2845

    
2846
    # check bridge existance
2847
    brlist = [nic.bridge for nic in instance.nics]
2848
    if not self.rpc.call_bridges_exist(target_node, brlist):
2849
      raise errors.OpPrereqError("One or more target bridges %s does not"
2850
                                 " exist on destination node '%s'" %
2851
                                 (brlist, target_node))
2852

    
2853
  def Exec(self, feedback_fn):
2854
    """Failover an instance.
2855

2856
    The failover is done by shutting it down on its present node and
2857
    starting it on the secondary.
2858

2859
    """
2860
    instance = self.instance
2861

    
2862
    source_node = instance.primary_node
2863
    target_node = instance.secondary_nodes[0]
2864

    
2865
    feedback_fn("* checking disk consistency between source and target")
2866
    for dev in instance.disks:
2867
      # for drbd, these are drbd over lvm
2868
      if not _CheckDiskConsistency(self, dev, target_node, False):
2869
        if instance.status == "up" and not self.op.ignore_consistency:
2870
          raise errors.OpExecError("Disk %s is degraded on target node,"
2871
                                   " aborting failover." % dev.iv_name)
2872

    
2873
    feedback_fn("* shutting down instance on source node")
2874
    logger.Info("Shutting down instance %s on node %s" %
2875
                (instance.name, source_node))
2876

    
2877
    if not self.rpc.call_instance_shutdown(source_node, instance):
2878
      if self.op.ignore_consistency:
2879
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2880
                     " anyway. Please make sure node %s is down"  %
2881
                     (instance.name, source_node, source_node))
2882
      else:
2883
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2884
                                 (instance.name, source_node))
2885

    
2886
    feedback_fn("* deactivating the instance's disks on source node")
2887
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2888
      raise errors.OpExecError("Can't shut down the instance's disks.")
2889

    
2890
    instance.primary_node = target_node
2891
    # distribute new instance config to the other nodes
2892
    self.cfg.Update(instance)
2893

    
2894
    # Only start the instance if it's marked as up
2895
    if instance.status == "up":
2896
      feedback_fn("* activating the instance's disks on target node")
2897
      logger.Info("Starting instance %s on node %s" %
2898
                  (instance.name, target_node))
2899

    
2900
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2901
                                               ignore_secondaries=True)
2902
      if not disks_ok:
2903
        _ShutdownInstanceDisks(self, instance)
2904
        raise errors.OpExecError("Can't activate the instance's disks")
2905

    
2906
      feedback_fn("* starting the instance on the target node")
2907
      if not self.rpc.call_instance_start(target_node, instance, None):
2908
        _ShutdownInstanceDisks(self, instance)
2909
        raise errors.OpExecError("Could not start instance %s on node %s." %
2910
                                 (instance.name, target_node))
2911

    
2912

    
2913
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2914
  """Create a tree of block devices on the primary node.
2915

2916
  This always creates all devices.
2917

2918
  """
2919
  if device.children:
2920
    for child in device.children:
2921
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2922
        return False
2923

    
2924
  lu.cfg.SetDiskID(device, node)
2925
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2926
                                       instance.name, True, info)
2927
  if not new_id:
2928
    return False
2929
  if device.physical_id is None:
2930
    device.physical_id = new_id
2931
  return True
2932

    
2933

    
2934
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2935
  """Create a tree of block devices on a secondary node.
2936

2937
  If this device type has to be created on secondaries, create it and
2938
  all its children.
2939

2940
  If not, just recurse to children keeping the same 'force' value.
2941

2942
  """
2943
  if device.CreateOnSecondary():
2944
    force = True
2945
  if device.children:
2946
    for child in device.children:
2947
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2948
                                        child, force, info):
2949
        return False
2950

    
2951
  if not force:
2952
    return True
2953
  lu.cfg.SetDiskID(device, node)
2954
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2955
                                       instance.name, False, info)
2956
  if not new_id:
2957
    return False
2958
  if device.physical_id is None:
2959
    device.physical_id = new_id
2960
  return True
2961

    
2962

    
2963
def _GenerateUniqueNames(lu, exts):
2964
  """Generate a suitable LV name.
2965

2966
  This will generate a logical volume name for the given instance.
2967

2968
  """
2969
  results = []
2970
  for val in exts:
2971
    new_id = lu.cfg.GenerateUniqueID()
2972
    results.append("%s%s" % (new_id, val))
2973
  return results
2974

    
2975

    
2976
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2977
                         p_minor, s_minor):
2978
  """Generate a drbd8 device complete with its children.
2979

2980
  """
2981
  port = lu.cfg.AllocatePort()
2982
  vgname = lu.cfg.GetVGName()
2983
  shared_secret = lu.cfg.GenerateDRBDSecret()
2984
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2985
                          logical_id=(vgname, names[0]))
2986
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2987
                          logical_id=(vgname, names[1]))
2988
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2989
                          logical_id=(primary, secondary, port,
2990
                                      p_minor, s_minor,
2991
                                      shared_secret),
2992
                          children=[dev_data, dev_meta],
2993
                          iv_name=iv_name)
2994
  return drbd_dev
2995

    
2996

    
2997
def _GenerateDiskTemplate(lu, template_name,
2998
                          instance_name, primary_node,
2999
                          secondary_nodes, disk_sz, swap_sz,
3000
                          file_storage_dir, file_driver):
3001
  """Generate the entire disk layout for a given template type.
3002

3003
  """
3004
  #TODO: compute space requirements
3005

    
3006
  vgname = lu.cfg.GetVGName()
3007
  if template_name == constants.DT_DISKLESS:
3008
    disks = []
3009
  elif template_name == constants.DT_PLAIN:
3010
    if len(secondary_nodes) != 0:
3011
      raise errors.ProgrammerError("Wrong template configuration")
3012

    
3013
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3014
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3015
                           logical_id=(vgname, names[0]),
3016
                           iv_name = "sda")
3017
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3018
                           logical_id=(vgname, names[1]),
3019
                           iv_name = "sdb")
3020
    disks = [sda_dev, sdb_dev]
3021
  elif template_name == constants.DT_DRBD8:
3022
    if len(secondary_nodes) != 1:
3023
      raise errors.ProgrammerError("Wrong template configuration")
3024
    remote_node = secondary_nodes[0]
3025
    (minor_pa, minor_pb,
3026
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3027
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3028

    
3029
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3030
                                      ".sdb_data", ".sdb_meta"])
3031
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3032
                                        disk_sz, names[0:2], "sda",
3033
                                        minor_pa, minor_sa)
3034
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3035
                                        swap_sz, names[2:4], "sdb",
3036
                                        minor_pb, minor_sb)
3037
    disks = [drbd_sda_dev, drbd_sdb_dev]
3038
  elif template_name == constants.DT_FILE:
3039
    if len(secondary_nodes) != 0:
3040
      raise errors.ProgrammerError("Wrong template configuration")
3041

    
3042
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3043
                                iv_name="sda", logical_id=(file_driver,
3044
                                "%s/sda" % file_storage_dir))
3045
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3046
                                iv_name="sdb", logical_id=(file_driver,
3047
                                "%s/sdb" % file_storage_dir))
3048
    disks = [file_sda_dev, file_sdb_dev]
3049
  else:
3050
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3051
  return disks
3052

    
3053

    
3054
def _GetInstanceInfoText(instance):
3055
  """Compute that text that should be added to the disk's metadata.
3056

3057
  """
3058
  return "originstname+%s" % instance.name
3059

    
3060

    
3061
def _CreateDisks(lu, instance):
3062
  """Create all disks for an instance.
3063

3064
  This abstracts away some work from AddInstance.
3065

3066
  Args:
3067
    instance: the instance object
3068

3069
  Returns:
3070
    True or False showing the success of the creation process
3071

3072
  """
3073
  info = _GetInstanceInfoText(instance)
3074

    
3075
  if instance.disk_template == constants.DT_FILE:
3076
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3077
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3078
                                                 file_storage_dir)
3079

    
3080
    if not result:
3081
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3082
      return False
3083

    
3084
    if not result[0]:
3085
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3086
      return False
3087

    
3088
  for device in instance.disks:
3089
    logger.Info("creating volume %s for instance %s" %
3090
                (device.iv_name, instance.name))
3091
    #HARDCODE
3092
    for secondary_node in instance.secondary_nodes:
3093
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3094
                                        device, False, info):
3095
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3096
                     (device.iv_name, device, secondary_node))
3097
        return False
3098
    #HARDCODE
3099
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3100
                                    instance, device, info):
3101
      logger.Error("failed to create volume %s on primary!" %
3102
                   device.iv_name)
3103
      return False
3104

    
3105
  return True
3106

    
3107

    
3108
def _RemoveDisks(lu, instance):
3109
  """Remove all disks for an instance.
3110

3111
  This abstracts away some work from `AddInstance()` and
3112
  `RemoveInstance()`. Note that in case some of the devices couldn't
3113
  be removed, the removal will continue with the other ones (compare
3114
  with `_CreateDisks()`).
3115

3116
  Args:
3117
    instance: the instance object
3118

3119
  Returns:
3120
    True or False showing the success of the removal proces
3121

3122
  """
3123
  logger.Info("removing block devices for instance %s" % instance.name)
3124

    
3125
  result = True
3126
  for device in instance.disks:
3127
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3128
      lu.cfg.SetDiskID(disk, node)
3129
      if not lu.rpc.call_blockdev_remove(node, disk):
3130
        logger.Error("could not remove block device %s on node %s,"
3131
                     " continuing anyway" %
3132
                     (device.iv_name, node))
3133
        result = False
3134

    
3135
  if instance.disk_template == constants.DT_FILE:
3136
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3137
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3138
                                               file_storage_dir):
3139
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3140
      result = False
3141

    
3142
  return result
3143

    
3144

    
3145
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3146
  """Compute disk size requirements in the volume group
3147

3148
  This is currently hard-coded for the two-drive layout.
3149

3150
  """
3151
  # Required free disk space as a function of disk and swap space
3152
  req_size_dict = {
3153
    constants.DT_DISKLESS: None,
3154
    constants.DT_PLAIN: disk_size + swap_size,
3155
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3156
    constants.DT_DRBD8: disk_size + swap_size + 256,
3157
    constants.DT_FILE: None,
3158
  }
3159

    
3160
  if disk_template not in req_size_dict:
3161
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3162
                                 " is unknown" %  disk_template)
3163

    
3164
  return req_size_dict[disk_template]
3165

    
3166

    
3167
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3168
  """Hypervisor parameter validation.
3169

3170
  This function abstract the hypervisor parameter validation to be
3171
  used in both instance create and instance modify.
3172

3173
  @type lu: L{LogicalUnit}
3174
  @param lu: the logical unit for which we check
3175
  @type nodenames: list
3176
  @param nodenames: the list of nodes on which we should check
3177
  @type hvname: string
3178
  @param hvname: the name of the hypervisor we should use
3179
  @type hvparams: dict
3180
  @param hvparams: the parameters which we need to check
3181
  @raise errors.OpPrereqError: if the parameters are not valid
3182

3183
  """
3184
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3185
                                                  hvname,
3186
                                                  hvparams)
3187
  for node in nodenames:
3188
    info = hvinfo.get(node, None)
3189
    if not info or not isinstance(info, (tuple, list)):
3190
      raise errors.OpPrereqError("Cannot get current information"
3191
                                 " from node '%s' (%s)" % (node, info))
3192
    if not info[0]:
3193
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3194
                                 " %s" % info[1])
3195

    
3196

    
3197
class LUCreateInstance(LogicalUnit):
3198
  """Create an instance.
3199

3200
  """
3201
  HPATH = "instance-add"
3202
  HTYPE = constants.HTYPE_INSTANCE
3203
  _OP_REQP = ["instance_name", "disk_size",
3204
              "disk_template", "swap_size", "mode", "start",
3205
              "wait_for_sync", "ip_check", "mac",
3206
              "hvparams", "beparams"]
3207
  REQ_BGL = False
3208

    
3209
  def _ExpandNode(self, node):
3210
    """Expands and checks one node name.
3211

3212
    """
3213
    node_full = self.cfg.ExpandNodeName(node)
3214
    if node_full is None:
3215
      raise errors.OpPrereqError("Unknown node %s" % node)
3216
    return node_full
3217

    
3218
  def ExpandNames(self):
3219
    """ExpandNames for CreateInstance.
3220

3221
    Figure out the right locks for instance creation.
3222

3223
    """
3224
    self.needed_locks = {}
3225

    
3226
    # set optional parameters to none if they don't exist
3227
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3228
      if not hasattr(self.op, attr):
3229
        setattr(self.op, attr, None)
3230

    
3231
    # cheap checks, mostly valid constants given
3232

    
3233
    # verify creation mode
3234
    if self.op.mode not in (constants.INSTANCE_CREATE,
3235
                            constants.INSTANCE_IMPORT):
3236
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3237
                                 self.op.mode)
3238

    
3239
    # disk template and mirror node verification
3240
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3241
      raise errors.OpPrereqError("Invalid disk template name")
3242

    
3243
    if self.op.hypervisor is None:
3244
      self.op.hypervisor = self.cfg.GetHypervisorType()
3245

    
3246
    cluster = self.cfg.GetClusterInfo()
3247
    enabled_hvs = cluster.enabled_hypervisors
3248
    if self.op.hypervisor not in enabled_hvs:
3249
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3250
                                 " cluster (%s)" % (self.op.hypervisor,
3251
                                  ",".join(enabled_hvs)))
3252

    
3253
    # check hypervisor parameter syntax (locally)
3254

    
3255
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3256
                                  self.op.hvparams)
3257
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3258
    hv_type.CheckParameterSyntax(filled_hvp)
3259

    
3260
    # fill and remember the beparams dict
3261
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3262
                                    self.op.beparams)
3263

    
3264
    #### instance parameters check
3265

    
3266
    # instance name verification
3267
    hostname1 = utils.HostInfo(self.op.instance_name)
3268
    self.op.instance_name = instance_name = hostname1.name
3269

    
3270
    # this is just a preventive check, but someone might still add this
3271
    # instance in the meantime, and creation will fail at lock-add time
3272
    if instance_name in self.cfg.GetInstanceList():
3273
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3274
                                 instance_name)
3275

    
3276
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3277

    
3278
    # ip validity checks
3279
    ip = getattr(self.op, "ip", None)
3280
    if ip is None or ip.lower() == "none":
3281
      inst_ip = None
3282
    elif ip.lower() == "auto":
3283
      inst_ip = hostname1.ip
3284
    else:
3285
      if not utils.IsValidIP(ip):
3286
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3287
                                   " like a valid IP" % ip)
3288
      inst_ip = ip
3289
    self.inst_ip = self.op.ip = inst_ip
3290
    # used in CheckPrereq for ip ping check
3291
    self.check_ip = hostname1.ip
3292

    
3293
    # MAC address verification
3294
    if self.op.mac != "auto":
3295
      if not utils.IsValidMac(self.op.mac.lower()):
3296
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3297
                                   self.op.mac)
3298

    
3299
    # file storage checks
3300
    if (self.op.file_driver and
3301
        not self.op.file_driver in constants.FILE_DRIVER):
3302
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3303
                                 self.op.file_driver)
3304

    
3305
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3306
      raise errors.OpPrereqError("File storage directory path not absolute")
3307

    
3308
    ### Node/iallocator related checks
3309
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3310
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3311
                                 " node must be given")
3312

    
3313
    if self.op.iallocator:
3314
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3315
    else:
3316
      self.op.pnode = self._ExpandNode(self.op.pnode)
3317
      nodelist = [self.op.pnode]
3318
      if self.op.snode is not None:
3319
        self.op.snode = self._ExpandNode(self.op.snode)
3320
        nodelist.append(self.op.snode)
3321
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3322

    
3323
    # in case of import lock the source node too
3324
    if self.op.mode == constants.INSTANCE_IMPORT:
3325
      src_node = getattr(self.op, "src_node", None)
3326
      src_path = getattr(self.op, "src_path", None)
3327

    
3328
      if src_node is None or src_path is None:
3329
        raise errors.OpPrereqError("Importing an instance requires source"
3330
                                   " node and path options")
3331

    
3332
      if not os.path.isabs(src_path):
3333
        raise errors.OpPrereqError("The source path must be absolute")
3334

    
3335
      self.op.src_node = src_node = self._ExpandNode(src_node)
3336
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3337
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3338

    
3339
    else: # INSTANCE_CREATE
3340
      if getattr(self.op, "os_type", None) is None:
3341
        raise errors.OpPrereqError("No guest OS specified")
3342

    
3343
  def _RunAllocator(self):
3344
    """Run the allocator based on input opcode.
3345

3346
    """
3347
    disks = [{"size": self.op.disk_size, "mode": "w"},
3348
             {"size": self.op.swap_size, "mode": "w"}]
3349
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3350
             "bridge": self.op.bridge}]
3351
    ial = IAllocator(self,
3352
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3353
                     name=self.op.instance_name,
3354
                     disk_template=self.op.disk_template,
3355
                     tags=[],
3356
                     os=self.op.os_type,
3357
                     vcpus=self.be_full[constants.BE_VCPUS],
3358
                     mem_size=self.be_full[constants.BE_MEMORY],
3359
                     disks=disks,
3360
                     nics=nics,
3361
                     )
3362

    
3363
    ial.Run(self.op.iallocator)
3364

    
3365
    if not ial.success:
3366
      raise errors.OpPrereqError("Can't compute nodes using"
3367
                                 " iallocator '%s': %s" % (self.op.iallocator,
3368
                                                           ial.info))
3369
    if len(ial.nodes) != ial.required_nodes:
3370
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3371
                                 " of nodes (%s), required %s" %
3372
                                 (self.op.iallocator, len(ial.nodes),
3373
                                  ial.required_nodes))
3374
    self.op.pnode = ial.nodes[0]
3375
    logger.ToStdout("Selected nodes for the instance: %s" %
3376
                    (", ".join(ial.nodes),))
3377
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3378
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3379
    if ial.required_nodes == 2:
3380
      self.op.snode = ial.nodes[1]
3381

    
3382
  def BuildHooksEnv(self):
3383
    """Build hooks env.
3384

3385
    This runs on master, primary and secondary nodes of the instance.
3386

3387
    """
3388
    env = {
3389
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3390
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3391
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3392
      "INSTANCE_ADD_MODE": self.op.mode,
3393
      }
3394
    if self.op.mode == constants.INSTANCE_IMPORT:
3395
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3396
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3397
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3398

    
3399
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3400
      primary_node=self.op.pnode,
3401
      secondary_nodes=self.secondaries,
3402
      status=self.instance_status,
3403
      os_type=self.op.os_type,
3404
      memory=self.be_full[constants.BE_MEMORY],
3405
      vcpus=self.be_full[constants.BE_VCPUS],
3406
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3407
    ))
3408

    
3409
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3410
          self.secondaries)
3411
    return env, nl, nl
3412

    
3413

    
3414
  def CheckPrereq(self):
3415
    """Check prerequisites.
3416

3417
    """
3418
    if (not self.cfg.GetVGName() and
3419
        self.op.disk_template not in constants.DTS_NOT_LVM):
3420
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3421
                                 " instances")
3422

    
3423

    
3424
    if self.op.mode == constants.INSTANCE_IMPORT:
3425
      src_node = self.op.src_node
3426
      src_path = self.op.src_path
3427

    
3428
      export_info = self.rpc.call_export_info(src_node, src_path)
3429

    
3430
      if not export_info:
3431
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3432

    
3433
      if not export_info.has_section(constants.INISECT_EXP):
3434
        raise errors.ProgrammerError("Corrupted export config")
3435

    
3436
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3437
      if (int(ei_version) != constants.EXPORT_VERSION):
3438
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3439
                                   (ei_version, constants.EXPORT_VERSION))
3440

    
3441
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3442
        raise errors.OpPrereqError("Can't import instance with more than"
3443
                                   " one data disk")
3444

    
3445
      # FIXME: are the old os-es, disk sizes, etc. useful?
3446
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3447
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3448
                                                         'disk0_dump'))
3449
      self.src_image = diskimage
3450

    
3451
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3452

    
3453
    if self.op.start and not self.op.ip_check:
3454
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3455
                                 " adding an instance in start mode")
3456

    
3457
    if self.op.ip_check:
3458
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3459
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3460
                                   (self.check_ip, self.op.instance_name))
3461

    
3462
    # bridge verification
3463
    bridge = getattr(self.op, "bridge", None)
3464
    if bridge is None:
3465
      self.op.bridge = self.cfg.GetDefBridge()
3466
    else:
3467
      self.op.bridge = bridge
3468

    
3469
    #### allocator run
3470

    
3471
    if self.op.iallocator is not None:
3472
      self._RunAllocator()
3473

    
3474
    #### node related checks
3475

    
3476
    # check primary node
3477
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3478
    assert self.pnode is not None, \
3479
      "Cannot retrieve locked node %s" % self.op.pnode
3480
    self.secondaries = []
3481

    
3482
    # mirror node verification
3483
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3484
      if self.op.snode is None:
3485
        raise errors.OpPrereqError("The networked disk templates need"
3486
                                   " a mirror node")
3487
      if self.op.snode == pnode.name:
3488
        raise errors.OpPrereqError("The secondary node cannot be"
3489
                                   " the primary node.")
3490
      self.secondaries.append(self.op.snode)
3491

    
3492
    nodenames = [pnode.name] + self.secondaries
3493

    
3494
    req_size = _ComputeDiskSize(self.op.disk_template,
3495
                                self.op.disk_size, self.op.swap_size)
3496

    
3497
    # Check lv size requirements
3498
    if req_size is not None:
3499
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3500
                                         self.op.hypervisor)
3501
      for node in nodenames:
3502
        info = nodeinfo.get(node, None)
3503
        if not info:
3504
          raise errors.OpPrereqError("Cannot get current information"
3505
                                     " from node '%s'" % node)
3506
        vg_free = info.get('vg_free', None)
3507
        if not isinstance(vg_free, int):
3508
          raise errors.OpPrereqError("Can't compute free disk space on"
3509
                                     " node %s" % node)
3510
        if req_size > info['vg_free']:
3511
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3512
                                     " %d MB available, %d MB required" %
3513
                                     (node, info['vg_free'], req_size))
3514

    
3515
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3516

    
3517
    # os verification
3518
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3519
    if not os_obj:
3520
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3521
                                 " primary node"  % self.op.os_type)
3522

    
3523
    # bridge check on primary node
3524
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3525
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3526
                                 " destination node '%s'" %
3527
                                 (self.op.bridge, pnode.name))
3528

    
3529
    # memory check on primary node
3530
    if self.op.start:
3531
      _CheckNodeFreeMemory(self, self.pnode.name,
3532
                           "creating instance %s" % self.op.instance_name,
3533
                           self.be_full[constants.BE_MEMORY],
3534
                           self.op.hypervisor)
3535

    
3536
    if self.op.start:
3537
      self.instance_status = 'up'
3538
    else:
3539
      self.instance_status = 'down'
3540

    
3541
  def Exec(self, feedback_fn):
3542
    """Create and add the instance to the cluster.
3543

3544
    """
3545
    instance = self.op.instance_name
3546
    pnode_name = self.pnode.name
3547

    
3548
    if self.op.mac == "auto":
3549
      mac_address = self.cfg.GenerateMAC()
3550
    else:
3551
      mac_address = self.op.mac
3552

    
3553
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3554
    if self.inst_ip is not None:
3555
      nic.ip = self.inst_ip
3556

    
3557
    ht_kind = self.op.hypervisor
3558
    if ht_kind in constants.HTS_REQ_PORT:
3559
      network_port = self.cfg.AllocatePort()
3560
    else:
3561
      network_port = None
3562

    
3563
    ##if self.op.vnc_bind_address is None:
3564
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3565

    
3566
    # this is needed because os.path.join does not accept None arguments
3567
    if self.op.file_storage_dir is None:
3568
      string_file_storage_dir = ""
3569
    else:
3570
      string_file_storage_dir = self.op.file_storage_dir
3571

    
3572
    # build the full file storage dir path
3573
    file_storage_dir = os.path.normpath(os.path.join(
3574
                                        self.cfg.GetFileStorageDir(),
3575
                                        string_file_storage_dir, instance))
3576

    
3577

    
3578
    disks = _GenerateDiskTemplate(self,
3579
                                  self.op.disk_template,
3580
                                  instance, pnode_name,
3581
                                  self.secondaries, self.op.disk_size,
3582
                                  self.op.swap_size,
3583
                                  file_storage_dir,
3584
                                  self.op.file_driver)
3585

    
3586
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3587
                            primary_node=pnode_name,
3588
                            nics=[nic], disks=disks,
3589
                            disk_template=self.op.disk_template,
3590
                            status=self.instance_status,
3591
                            network_port=network_port,
3592
                            beparams=self.op.beparams,
3593
                            hvparams=self.op.hvparams,
3594
                            hypervisor=self.op.hypervisor,
3595
                            )
3596

    
3597
    feedback_fn("* creating instance disks...")
3598
    if not _CreateDisks(self, iobj):
3599
      _RemoveDisks(self, iobj)
3600
      self.cfg.ReleaseDRBDMinors(instance)
3601
      raise errors.OpExecError("Device creation failed, reverting...")
3602

    
3603
    feedback_fn("adding instance %s to cluster config" % instance)
3604

    
3605
    self.cfg.AddInstance(iobj)
3606
    # Declare that we don't want to remove the instance lock anymore, as we've
3607
    # added the instance to the config
3608
    del self.remove_locks[locking.LEVEL_INSTANCE]
3609
    # Remove the temp. assignements for the instance's drbds
3610
    self.cfg.ReleaseDRBDMinors(instance)
3611

    
3612
    if self.op.wait_for_sync:
3613
      disk_abort = not _WaitForSync(self, iobj)
3614
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3615
      # make sure the disks are not degraded (still sync-ing is ok)
3616
      time.sleep(15)
3617
      feedback_fn("* checking mirrors status")
3618
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3619
    else:
3620
      disk_abort = False
3621

    
3622
    if disk_abort:
3623
      _RemoveDisks(self, iobj)
3624
      self.cfg.RemoveInstance(iobj.name)
3625
      # Make sure the instance lock gets removed
3626
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3627
      raise errors.OpExecError("There are some degraded disks for"
3628
                               " this instance")
3629

    
3630
    feedback_fn("creating os for instance %s on node %s" %
3631
                (instance, pnode_name))
3632

    
3633
    if iobj.disk_template != constants.DT_DISKLESS:
3634
      if self.op.mode == constants.INSTANCE_CREATE:
3635
        feedback_fn("* running the instance OS create scripts...")
3636
        if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3637
          raise errors.OpExecError("could not add os for instance %s"
3638
                                   " on node %s" %
3639
                                   (instance, pnode_name))
3640

    
3641
      elif self.op.mode == constants.INSTANCE_IMPORT:
3642
        feedback_fn("* running the instance OS import scripts...")
3643
        src_node = self.op.src_node
3644
        src_image = self.src_image
3645
        cluster_name = self.cfg.GetClusterName()
3646
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3647
                                                src_node, src_image,
3648
                                                cluster_name):
3649
          raise errors.OpExecError("Could not import os for instance"
3650
                                   " %s on node %s" %
3651
                                   (instance, pnode_name))
3652
      else:
3653
        # also checked in the prereq part
3654
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3655
                                     % self.op.mode)
3656

    
3657
    if self.op.start:
3658
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3659
      feedback_fn("* starting instance...")
3660
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3661
        raise errors.OpExecError("Could not start instance")
3662

    
3663

    
3664
class LUConnectConsole(NoHooksLU):
3665
  """Connect to an instance's console.
3666

3667
  This is somewhat special in that it returns the command line that
3668
  you need to run on the master node in order to connect to the
3669
  console.
3670

3671
  """
3672
  _OP_REQP = ["instance_name"]
3673
  REQ_BGL = False
3674

    
3675
  def ExpandNames(self):
3676
    self._ExpandAndLockInstance()
3677

    
3678
  def CheckPrereq(self):
3679
    """Check prerequisites.
3680

3681
    This checks that the instance is in the cluster.
3682

3683
    """
3684
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3685
    assert self.instance is not None, \
3686
      "Cannot retrieve locked instance %s" % self.op.instance_name
3687

    
3688
  def Exec(self, feedback_fn):
3689
    """Connect to the console of an instance
3690

3691
    """
3692
    instance = self.instance
3693
    node = instance.primary_node
3694

    
3695
    node_insts = self.rpc.call_instance_list([node],
3696
                                             [instance.hypervisor])[node]
3697
    if node_insts is False:
3698
      raise errors.OpExecError("Can't connect to node %s." % node)
3699

    
3700
    if instance.name not in node_insts:
3701
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3702

    
3703
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3704

    
3705
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3706
    console_cmd = hyper.GetShellCommandForConsole(instance)
3707

    
3708
    # build ssh cmdline
3709
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3710

    
3711

    
3712
class LUReplaceDisks(LogicalUnit):
3713
  """Replace the disks of an instance.
3714

3715
  """
3716
  HPATH = "mirrors-replace"
3717
  HTYPE = constants.HTYPE_INSTANCE
3718
  _OP_REQP = ["instance_name", "mode", "disks"]
3719
  REQ_BGL = False
3720

    
3721
  def ExpandNames(self):
3722
    self._ExpandAndLockInstance()
3723

    
3724
    if not hasattr(self.op, "remote_node"):
3725
      self.op.remote_node = None
3726

    
3727
    ia_name = getattr(self.op, "iallocator", None)
3728
    if ia_name is not None:
3729
      if self.op.remote_node is not None:
3730
        raise errors.OpPrereqError("Give either the iallocator or the new"
3731
                                   " secondary, not both")
3732
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3733
    elif self.op.remote_node is not None:
3734
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3735
      if remote_node is None:
3736
        raise errors.OpPrereqError("Node '%s' not known" %
3737
                                   self.op.remote_node)
3738
      self.op.remote_node = remote_node
3739
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3740
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3741
    else:
3742
      self.needed_locks[locking.LEVEL_NODE] = []
3743
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3744

    
3745
  def DeclareLocks(self, level):
3746
    # If we're not already locking all nodes in the set we have to declare the
3747
    # instance's primary/secondary nodes.
3748
    if (level == locking.LEVEL_NODE and
3749
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3750
      self._LockInstancesNodes()
3751

    
3752
  def _RunAllocator(self):
3753
    """Compute a new secondary node using an IAllocator.
3754

3755
    """
3756
    ial = IAllocator(self,
3757
                     mode=constants.IALLOCATOR_MODE_RELOC,
3758
                     name=self.op.instance_name,
3759
                     relocate_from=[self.sec_node])
3760

    
3761
    ial.Run(self.op.iallocator)
3762

    
3763
    if not ial.success:
3764
      raise errors.OpPrereqError("Can't compute nodes using"
3765
                                 " iallocator '%s': %s" % (self.op.iallocator,
3766
                                                           ial.info))
3767
    if len(ial.nodes) != ial.required_nodes:
3768
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3769
                                 " of nodes (%s), required %s" %
3770
                                 (len(ial.nodes), ial.required_nodes))
3771
    self.op.remote_node = ial.nodes[0]
3772
    logger.ToStdout("Selected new secondary for the instance: %s" %
3773
                    self.op.remote_node)
3774

    
3775
  def BuildHooksEnv(self):
3776
    """Build hooks env.
3777

3778
    This runs on the master, the primary and all the secondaries.
3779

3780
    """
3781
    env = {
3782
      "MODE": self.op.mode,
3783
      "NEW_SECONDARY": self.op.remote_node,
3784
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3785
      }
3786
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3787
    nl = [
3788
      self.cfg.GetMasterNode(),
3789
      self.instance.primary_node,
3790
      ]
3791
    if self.op.remote_node is not None:
3792
      nl.append(self.op.remote_node)
3793
    return env, nl, nl
3794

    
3795
  def CheckPrereq(self):
3796
    """Check prerequisites.
3797

3798
    This checks that the instance is in the cluster.
3799

3800
    """
3801
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3802
    assert instance is not None, \
3803
      "Cannot retrieve locked instance %s" % self.op.instance_name
3804
    self.instance = instance
3805

    
3806
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3807
      raise errors.OpPrereqError("Instance's disk layout is not"
3808
                                 " network mirrored.")
3809

    
3810
    if len(instance.secondary_nodes) != 1:
3811
      raise errors.OpPrereqError("The instance has a strange layout,"
3812
                                 " expected one secondary but found %d" %
3813
                                 len(instance.secondary_nodes))
3814

    
3815
    self.sec_node = instance.secondary_nodes[0]
3816

    
3817
    ia_name = getattr(self.op, "iallocator", None)
3818
    if ia_name is not None:
3819
      self._RunAllocator()
3820

    
3821
    remote_node = self.op.remote_node
3822
    if remote_node is not None:
3823
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3824
      assert self.remote_node_info is not None, \
3825
        "Cannot retrieve locked node %s" % remote_node
3826
    else:
3827
      self.remote_node_info = None
3828
    if remote_node == instance.primary_node:
3829
      raise errors.OpPrereqError("The specified node is the primary node of"
3830
                                 " the instance.")
3831
    elif remote_node == self.sec_node:
3832
      if self.op.mode == constants.REPLACE_DISK_SEC:
3833
        # this is for DRBD8, where we can't execute the same mode of
3834
        # replacement as for drbd7 (no different port allocated)
3835
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3836
                                   " replacement")
3837
    if instance.disk_template == constants.DT_DRBD8:
3838
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3839
          remote_node is not None):
3840
        # switch to replace secondary mode
3841
        self.op.mode = constants.REPLACE_DISK_SEC
3842

    
3843
      if self.op.mode == constants.REPLACE_DISK_ALL:
3844
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3845
                                   " secondary disk replacement, not"
3846
                                   " both at once")
3847
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3848
        if remote_node is not None:
3849
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3850
                                     " the secondary while doing a primary"
3851
                                     " node disk replacement")
3852
        self.tgt_node = instance.primary_node
3853
        self.oth_node = instance.secondary_nodes[0]
3854
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3855
        self.new_node = remote_node # this can be None, in which case
3856
                                    # we don't change the secondary
3857
        self.tgt_node = instance.secondary_nodes[0]
3858
        self.oth_node = instance.primary_node
3859
      else:
3860
        raise errors.ProgrammerError("Unhandled disk replace mode")
3861

    
3862
    for name in self.op.disks:
3863
      if instance.FindDisk(name) is None:
3864
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3865
                                   (name, instance.name))
3866

    
3867
  def _ExecD8DiskOnly(self, feedback_fn):
3868
    """Replace a disk on the primary or secondary for dbrd8.
3869

3870
    The algorithm for replace is quite complicated:
3871
      - for each disk to be replaced:
3872
        - create new LVs on the target node with unique names
3873
        - detach old LVs from the drbd device
3874
        - rename old LVs to name_replaced.<time_t>
3875
        - rename new LVs to old LVs
3876
        - attach the new LVs (with the old names now) to the drbd device
3877
      - wait for sync across all devices
3878
      - for each modified disk:
3879
        - remove old LVs (which have the name name_replaces.<time_t>)
3880

3881
    Failures are not very well handled.
3882

3883
    """
3884
    steps_total = 6
3885
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3886
    instance = self.instance
3887
    iv_names = {}
3888
    vgname = self.cfg.GetVGName()
3889
    # start of work
3890
    cfg = self.cfg
3891
    tgt_node = self.tgt_node
3892
    oth_node = self.oth_node
3893

    
3894
    # Step: check device activation
3895
    self.proc.LogStep(1, steps_total, "check device existence")
3896
    info("checking volume groups")
3897
    my_vg = cfg.GetVGName()
3898
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3899
    if not results:
3900
      raise errors.OpExecError("Can't list volume groups on the nodes")
3901
    for node in oth_node, tgt_node:
3902
      res = results.get(node, False)
3903
      if not res or my_vg not in res:
3904
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3905
                                 (my_vg, node))
3906
    for dev in instance.disks:
3907
      if not dev.iv_name in self.op.disks:
3908
        continue
3909
      for node in tgt_node, oth_node:
3910
        info("checking %s on %s" % (dev.iv_name, node))
3911
        cfg.SetDiskID(dev, node)
3912
        if not self.rpc.call_blockdev_find(node, dev):
3913
          raise errors.OpExecError("Can't find device %s on node %s" %
3914
                                   (dev.iv_name, node))
3915

    
3916
    # Step: check other node consistency
3917
    self.proc.LogStep(2, steps_total, "check peer consistency")
3918
    for dev in instance.disks:
3919
      if not dev.iv_name in self.op.disks:
3920
        continue
3921
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3922
      if not _CheckDiskConsistency(self, dev, oth_node,
3923
                                   oth_node==instance.primary_node):
3924
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3925
                                 " to replace disks on this node (%s)" %
3926
                                 (oth_node, tgt_node))
3927

    
3928
    # Step: create new storage
3929
    self.proc.LogStep(3, steps_total, "allocate new storage")
3930
    for dev in instance.disks:
3931
      if not dev.iv_name in self.op.disks:
3932
        continue
3933
      size = dev.size
3934
      cfg.SetDiskID(dev, tgt_node)
3935
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3936
      names = _GenerateUniqueNames(self, lv_names)
3937
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3938
                             logical_id=(vgname, names[0]))
3939
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3940
                             logical_id=(vgname, names[1]))
3941
      new_lvs = [lv_data, lv_meta]
3942
      old_lvs = dev.children
3943
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3944
      info("creating new local storage on %s for %s" %
3945
           (tgt_node, dev.iv_name))
3946
      # since we *always* want to create this LV, we use the
3947
      # _Create...OnPrimary (which forces the creation), even if we
3948
      # are talking about the secondary node
3949
      for new_lv in new_lvs:
3950
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3951
                                        _GetInstanceInfoText(instance)):
3952
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3953
                                   " node '%s'" %
3954
                                   (new_lv.logical_id[1], tgt_node))
3955

    
3956
    # Step: for each lv, detach+rename*2+attach
3957
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3958
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3959
      info("detaching %s drbd from local storage" % dev.iv_name)
3960
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3961
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3962
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3963
      #dev.children = []
3964
      #cfg.Update(instance)
3965

    
3966
      # ok, we created the new LVs, so now we know we have the needed
3967
      # storage; as such, we proceed on the target node to rename
3968
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3969
      # using the assumption that logical_id == physical_id (which in
3970
      # turn is the unique_id on that node)
3971

    
3972
      # FIXME(iustin): use a better name for the replaced LVs
3973
      temp_suffix = int(time.time())
3974
      ren_fn = lambda d, suff: (d.physical_id[0