Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3ccafd0e

History | View | Annotate | Download (187.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_BGL = True
69

    
70
  def __init__(self, processor, op, context, rpc):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = context.cfg
80
    self.context = context
81
    self.rpc = rpc
82
    # Dicts used to declare locking needs to mcpu
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    self.add_locks = {}
87
    self.remove_locks = {}
88
    # Used to force good behavior when calling helper functions
89
    self.recalculate_locks = {}
90
    self.__ssh = None
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97

    
98
    if not self.cfg.IsCluster():
99
      raise errors.OpPrereqError("Cluster not initialized yet,"
100
                                 " use 'gnt-cluster init' first.")
101
    if self.REQ_MASTER:
102
      master = self.cfg.GetMasterNode()
103
      if master != utils.HostInfo().name:
104
        raise errors.OpPrereqError("Commands must be run on the master"
105
                                   " node %s" % master)
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def ExpandNames(self):
118
    """Expand names for this LU.
119

120
    This method is called before starting to execute the opcode, and it should
121
    update all the parameters of the opcode to their canonical form (e.g. a
122
    short node name must be fully expanded after this method has successfully
123
    completed). This way locking, hooks, logging, ecc. can work correctly.
124

125
    LUs which implement this method must also populate the self.needed_locks
126
    member, as a dict with lock levels as keys, and a list of needed lock names
127
    as values. Rules:
128
      - Use an empty dict if you don't need any lock
129
      - If you don't need any lock at a particular level omit that level
130
      - Don't put anything for the BGL level
131
      - If you want all locks at a level use locking.ALL_SET as a value
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: locking.ALL_SET,
141
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self, primary_only=False):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    @type primary_only: boolean
286
    @param primary_only: only lock primary nodes of locked instances
287

288
    """
289
    assert locking.LEVEL_NODE in self.recalculate_locks, \
290
      "_LockInstancesNodes helper function called with no nodes to recalculate"
291

    
292
    # TODO: check if we're really been called with the instance locks held
293

    
294
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295
    # future we might want to have different behaviors depending on the value
296
    # of self.recalculate_locks[locking.LEVEL_NODE]
297
    wanted_nodes = []
298
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299
      instance = self.context.cfg.GetInstanceInfo(instance_name)
300
      wanted_nodes.append(instance.primary_node)
301
      if not primary_only:
302
        wanted_nodes.extend(instance.secondary_nodes)
303

    
304
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308

    
309
    del self.recalculate_locks[locking.LEVEL_NODE]
310

    
311

    
312
class NoHooksLU(LogicalUnit):
313
  """Simple LU which runs no hooks.
314

315
  This LU is intended as a parent for other LogicalUnits which will
316
  run no hooks, in order to reduce duplicate code.
317

318
  """
319
  HPATH = None
320
  HTYPE = None
321

    
322

    
323
def _GetWantedNodes(lu, nodes):
324
  """Returns list of checked and expanded node names.
325

326
  Args:
327
    nodes: List of nodes (strings) or None for all
328

329
  """
330
  if not isinstance(nodes, list):
331
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
332

    
333
  if not nodes:
334
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335
      " non-empty list of nodes whose name is to be expanded.")
336

    
337
  wanted = []
338
  for name in nodes:
339
    node = lu.cfg.ExpandNodeName(name)
340
    if node is None:
341
      raise errors.OpPrereqError("No such node name '%s'" % name)
342
    wanted.append(node)
343

    
344
  return utils.NiceSort(wanted)
345

    
346

    
347
def _GetWantedInstances(lu, instances):
348
  """Returns list of checked and expanded instance names.
349

350
  Args:
351
    instances: List of instances (strings) or None for all
352

353
  """
354
  if not isinstance(instances, list):
355
    raise errors.OpPrereqError("Invalid argument type 'instances'")
356

    
357
  if instances:
358
    wanted = []
359

    
360
    for name in instances:
361
      instance = lu.cfg.ExpandInstanceName(name)
362
      if instance is None:
363
        raise errors.OpPrereqError("No such instance name '%s'" % name)
364
      wanted.append(instance)
365

    
366
  else:
367
    wanted = lu.cfg.GetInstanceList()
368
  return utils.NiceSort(wanted)
369

    
370

    
371
def _CheckOutputFields(static, dynamic, selected):
372
  """Checks whether all selected fields are valid.
373

374
  Args:
375
    static: Static fields
376
    dynamic: Dynamic fields
377

378
  """
379
  static_fields = frozenset(static)
380
  dynamic_fields = frozenset(dynamic)
381

    
382
  all_fields = static_fields | dynamic_fields
383

    
384
  if not all_fields.issuperset(selected):
385
    raise errors.OpPrereqError("Unknown output fields selected: %s"
386
                               % ",".join(frozenset(selected).
387
                                          difference(all_fields)))
388

    
389

    
390
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391
                          memory, vcpus, nics):
392
  """Builds instance related env variables for hooks from single variables.
393

394
  Args:
395
    secondary_nodes: List of secondary nodes as strings
396
  """
397
  env = {
398
    "OP_TARGET": name,
399
    "INSTANCE_NAME": name,
400
    "INSTANCE_PRIMARY": primary_node,
401
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402
    "INSTANCE_OS_TYPE": os_type,
403
    "INSTANCE_STATUS": status,
404
    "INSTANCE_MEMORY": memory,
405
    "INSTANCE_VCPUS": vcpus,
406
  }
407

    
408
  if nics:
409
    nic_count = len(nics)
410
    for idx, (ip, bridge, mac) in enumerate(nics):
411
      if ip is None:
412
        ip = ""
413
      env["INSTANCE_NIC%d_IP" % idx] = ip
414
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416
  else:
417
    nic_count = 0
418

    
419
  env["INSTANCE_NIC_COUNT"] = nic_count
420

    
421
  return env
422

    
423

    
424
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
425
  """Builds instance related env variables for hooks from an object.
426

427
  Args:
428
    instance: objects.Instance object of instance
429
    override: dict of values to override
430
  """
431
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': bep[constants.BE_MEMORY],
439
    'vcpus': bep[constants.BE_VCPUS],
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(lu, instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.cfg.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.cfg.GetMasterNode()
489
    if not self.rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    if not node_result:
555
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
556
      return True
557

    
558
    # checks config file checksum
559
    # checks ssh to any
560

    
561
    if 'filelist' not in node_result:
562
      bad = True
563
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
564
    else:
565
      remote_cksum = node_result['filelist']
566
      for file_name in file_list:
567
        if file_name not in remote_cksum:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
570
        elif remote_cksum[file_name] != local_cksum[file_name]:
571
          bad = True
572
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
573

    
574
    if 'nodelist' not in node_result:
575
      bad = True
576
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
577
    else:
578
      if node_result['nodelist']:
579
        bad = True
580
        for node in node_result['nodelist']:
581
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
582
                          (node, node_result['nodelist'][node]))
583
    if 'node-net-test' not in node_result:
584
      bad = True
585
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
586
    else:
587
      if node_result['node-net-test']:
588
        bad = True
589
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
590
        for node in nlist:
591
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
592
                          (node, node_result['node-net-test'][node]))
593

    
594
    hyp_result = node_result.get('hypervisor', None)
595
    if isinstance(hyp_result, dict):
596
      for hv_name, hv_result in hyp_result.iteritems():
597
        if hv_result is not None:
598
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
599
                      (hv_name, hv_result))
600
    return bad
601

    
602
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
603
                      node_instance, feedback_fn):
604
    """Verify an instance.
605

606
    This function checks to see if the required block devices are
607
    available on the instance's node.
608

609
    """
610
    bad = False
611

    
612
    node_current = instanceconfig.primary_node
613

    
614
    node_vol_should = {}
615
    instanceconfig.MapLVsByNode(node_vol_should)
616

    
617
    for node in node_vol_should:
618
      for volume in node_vol_should[node]:
619
        if node not in node_vol_is or volume not in node_vol_is[node]:
620
          feedback_fn("  - ERROR: volume %s missing on node %s" %
621
                          (volume, node))
622
          bad = True
623

    
624
    if not instanceconfig.status == 'down':
625
      if (node_current not in node_instance or
626
          not instance in node_instance[node_current]):
627
        feedback_fn("  - ERROR: instance %s not running on node %s" %
628
                        (instance, node_current))
629
        bad = True
630

    
631
    for node in node_instance:
632
      if (not node == node_current):
633
        if instance in node_instance[node]:
634
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
635
                          (instance, node))
636
          bad = True
637

    
638
    return bad
639

    
640
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
641
    """Verify if there are any unknown volumes in the cluster.
642

643
    The .os, .swap and backup volumes are ignored. All other volumes are
644
    reported as unknown.
645

646
    """
647
    bad = False
648

    
649
    for node in node_vol_is:
650
      for volume in node_vol_is[node]:
651
        if node not in node_vol_should or volume not in node_vol_should[node]:
652
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
653
                      (volume, node))
654
          bad = True
655
    return bad
656

    
657
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
658
    """Verify the list of running instances.
659

660
    This checks what instances are running but unknown to the cluster.
661

662
    """
663
    bad = False
664
    for node in node_instance:
665
      for runninginstance in node_instance[node]:
666
        if runninginstance not in instancelist:
667
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
668
                          (runninginstance, node))
669
          bad = True
670
    return bad
671

    
672
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
673
    """Verify N+1 Memory Resilience.
674

675
    Check that if one single node dies we can still start all the instances it
676
    was primary for.
677

678
    """
679
    bad = False
680

    
681
    for node, nodeinfo in node_info.iteritems():
682
      # This code checks that every node which is now listed as secondary has
683
      # enough memory to host all instances it is supposed to should a single
684
      # other node in the cluster fail.
685
      # FIXME: not ready for failover to an arbitrary node
686
      # FIXME: does not support file-backed instances
687
      # WARNING: we currently take into account down instances as well as up
688
      # ones, considering that even if they're down someone might want to start
689
      # them even in the event of a node failure.
690
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691
        needed_mem = 0
692
        for instance in instances:
693
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
694
          if bep[constants.BE_AUTO_BALANCE]:
695
            needed_mem += bep[constants.BE_MEMORY]
696
        if nodeinfo['mfree'] < needed_mem:
697
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
698
                      " failovers should node %s fail" % (node, prinode))
699
          bad = True
700
    return bad
701

    
702
  def CheckPrereq(self):
703
    """Check prerequisites.
704

705
    Transform the list of checks we're going to skip into a set and check that
706
    all its members are valid.
707

708
    """
709
    self.skip_set = frozenset(self.op.skip_checks)
710
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
711
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
712

    
713
  def BuildHooksEnv(self):
714
    """Build hooks env.
715

716
    Cluster-Verify hooks just rone in the post phase and their failure makes
717
    the output be logged in the verify output and the verification to fail.
718

719
    """
720
    all_nodes = self.cfg.GetNodeList()
721
    # TODO: populate the environment with useful information for verify hooks
722
    env = {}
723
    return env, [], all_nodes
724

    
725
  def Exec(self, feedback_fn):
726
    """Verify integrity of cluster, performing various test on nodes.
727

728
    """
729
    bad = False
730
    feedback_fn("* Verifying global settings")
731
    for msg in self.cfg.VerifyConfig():
732
      feedback_fn("  - ERROR: %s" % msg)
733

    
734
    vg_name = self.cfg.GetVGName()
735
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
736
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
737
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
738
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
739
    i_non_redundant = [] # Non redundant instances
740
    i_non_a_balanced = [] # Non auto-balanced instances
741
    node_volume = {}
742
    node_instance = {}
743
    node_info = {}
744
    instance_cfg = {}
745

    
746
    # FIXME: verify OS list
747
    # do local checksums
748
    file_names = []
749
    file_names.append(constants.SSL_CERT_FILE)
750
    file_names.append(constants.CLUSTER_CONF_FILE)
751
    local_checksums = utils.FingerprintFiles(file_names)
752

    
753
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
754
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
755
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
756
    all_vglist = self.rpc.call_vg_list(nodelist)
757
    node_verify_param = {
758
      'filelist': file_names,
759
      'nodelist': nodelist,
760
      'hypervisor': hypervisors,
761
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
762
                        for node in nodeinfo]
763
      }
764
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
765
                                           self.cfg.GetClusterName())
766
    all_rversion = self.rpc.call_version(nodelist)
767
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
768
                                        self.cfg.GetHypervisorType())
769

    
770
    cluster = self.cfg.GetClusterInfo()
771
    for node in nodelist:
772
      feedback_fn("* Verifying node %s" % node)
773
      result = self._VerifyNode(node, file_names, local_checksums,
774
                                all_vglist[node], all_nvinfo[node],
775
                                all_rversion[node], feedback_fn)
776
      bad = bad or result
777

    
778
      # node_volume
779
      volumeinfo = all_volumeinfo[node]
780

    
781
      if isinstance(volumeinfo, basestring):
782
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
783
                    (node, volumeinfo[-400:].encode('string_escape')))
784
        bad = True
785
        node_volume[node] = {}
786
      elif not isinstance(volumeinfo, dict):
787
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
788
        bad = True
789
        continue
790
      else:
791
        node_volume[node] = volumeinfo
792

    
793
      # node_instance
794
      nodeinstance = all_instanceinfo[node]
795
      if type(nodeinstance) != list:
796
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
797
        bad = True
798
        continue
799

    
800
      node_instance[node] = nodeinstance
801

    
802
      # node_info
803
      nodeinfo = all_ninfo[node]
804
      if not isinstance(nodeinfo, dict):
805
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
806
        bad = True
807
        continue
808

    
809
      try:
810
        node_info[node] = {
811
          "mfree": int(nodeinfo['memory_free']),
812
          "dfree": int(nodeinfo['vg_free']),
813
          "pinst": [],
814
          "sinst": [],
815
          # dictionary holding all instances this node is secondary for,
816
          # grouped by their primary node. Each key is a cluster node, and each
817
          # value is a list of instances which have the key as primary and the
818
          # current node as secondary.  this is handy to calculate N+1 memory
819
          # availability if you can only failover from a primary to its
820
          # secondary.
821
          "sinst-by-pnode": {},
822
        }
823
      except ValueError:
824
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
825
        bad = True
826
        continue
827

    
828
    node_vol_should = {}
829

    
830
    for instance in instancelist:
831
      feedback_fn("* Verifying instance %s" % instance)
832
      inst_config = self.cfg.GetInstanceInfo(instance)
833
      result =  self._VerifyInstance(instance, inst_config, node_volume,
834
                                     node_instance, feedback_fn)
835
      bad = bad or result
836

    
837
      inst_config.MapLVsByNode(node_vol_should)
838

    
839
      instance_cfg[instance] = inst_config
840

    
841
      pnode = inst_config.primary_node
842
      if pnode in node_info:
843
        node_info[pnode]['pinst'].append(instance)
844
      else:
845
        feedback_fn("  - ERROR: instance %s, connection to primary node"
846
                    " %s failed" % (instance, pnode))
847
        bad = True
848

    
849
      # If the instance is non-redundant we cannot survive losing its primary
850
      # node, so we are not N+1 compliant. On the other hand we have no disk
851
      # templates with more than one secondary so that situation is not well
852
      # supported either.
853
      # FIXME: does not support file-backed instances
854
      if len(inst_config.secondary_nodes) == 0:
855
        i_non_redundant.append(instance)
856
      elif len(inst_config.secondary_nodes) > 1:
857
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
858
                    % instance)
859

    
860
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
861
        i_non_a_balanced.append(instance)
862

    
863
      for snode in inst_config.secondary_nodes:
864
        if snode in node_info:
865
          node_info[snode]['sinst'].append(instance)
866
          if pnode not in node_info[snode]['sinst-by-pnode']:
867
            node_info[snode]['sinst-by-pnode'][pnode] = []
868
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
869
        else:
870
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
871
                      " %s failed" % (instance, snode))
872

    
873
    feedback_fn("* Verifying orphan volumes")
874
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
875
                                       feedback_fn)
876
    bad = bad or result
877

    
878
    feedback_fn("* Verifying remaining instances")
879
    result = self._VerifyOrphanInstances(instancelist, node_instance,
880
                                         feedback_fn)
881
    bad = bad or result
882

    
883
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
884
      feedback_fn("* Verifying N+1 Memory redundancy")
885
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
886
      bad = bad or result
887

    
888
    feedback_fn("* Other Notes")
889
    if i_non_redundant:
890
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
891
                  % len(i_non_redundant))
892

    
893
    if i_non_a_balanced:
894
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
895
                  % len(i_non_a_balanced))
896

    
897
    return not bad
898

    
899
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
900
    """Analize the post-hooks' result, handle it, and send some
901
    nicely-formatted feedback back to the user.
902

903
    Args:
904
      phase: the hooks phase that has just been run
905
      hooks_results: the results of the multi-node hooks rpc call
906
      feedback_fn: function to send feedback back to the caller
907
      lu_result: previous Exec result
908

909
    """
910
    # We only really run POST phase hooks, and are only interested in
911
    # their results
912
    if phase == constants.HOOKS_PHASE_POST:
913
      # Used to change hooks' output to proper indentation
914
      indent_re = re.compile('^', re.M)
915
      feedback_fn("* Hooks Results")
916
      if not hooks_results:
917
        feedback_fn("  - ERROR: general communication failure")
918
        lu_result = 1
919
      else:
920
        for node_name in hooks_results:
921
          show_node_header = True
922
          res = hooks_results[node_name]
923
          if res is False or not isinstance(res, list):
924
            feedback_fn("    Communication failure")
925
            lu_result = 1
926
            continue
927
          for script, hkr, output in res:
928
            if hkr == constants.HKR_FAIL:
929
              # The node header is only shown once, if there are
930
              # failing hooks on that node
931
              if show_node_header:
932
                feedback_fn("  Node %s:" % node_name)
933
                show_node_header = False
934
              feedback_fn("    ERROR: Script %s failed, output:" % script)
935
              output = indent_re.sub('      ', output)
936
              feedback_fn("%s" % output)
937
              lu_result = 1
938

    
939
      return lu_result
940

    
941

    
942
class LUVerifyDisks(NoHooksLU):
943
  """Verifies the cluster disks status.
944

945
  """
946
  _OP_REQP = []
947
  REQ_BGL = False
948

    
949
  def ExpandNames(self):
950
    self.needed_locks = {
951
      locking.LEVEL_NODE: locking.ALL_SET,
952
      locking.LEVEL_INSTANCE: locking.ALL_SET,
953
    }
954
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
955

    
956
  def CheckPrereq(self):
957
    """Check prerequisites.
958

959
    This has no prerequisites.
960

961
    """
962
    pass
963

    
964
  def Exec(self, feedback_fn):
965
    """Verify integrity of cluster disks.
966

967
    """
968
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
969

    
970
    vg_name = self.cfg.GetVGName()
971
    nodes = utils.NiceSort(self.cfg.GetNodeList())
972
    instances = [self.cfg.GetInstanceInfo(name)
973
                 for name in self.cfg.GetInstanceList()]
974

    
975
    nv_dict = {}
976
    for inst in instances:
977
      inst_lvs = {}
978
      if (inst.status != "up" or
979
          inst.disk_template not in constants.DTS_NET_MIRROR):
980
        continue
981
      inst.MapLVsByNode(inst_lvs)
982
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
983
      for node, vol_list in inst_lvs.iteritems():
984
        for vol in vol_list:
985
          nv_dict[(node, vol)] = inst
986

    
987
    if not nv_dict:
988
      return result
989

    
990
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
991

    
992
    to_act = set()
993
    for node in nodes:
994
      # node_volume
995
      lvs = node_lvs[node]
996

    
997
      if isinstance(lvs, basestring):
998
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
999
        res_nlvm[node] = lvs
1000
      elif not isinstance(lvs, dict):
1001
        logger.Info("connection to node %s failed or invalid data returned" %
1002
                    (node,))
1003
        res_nodes.append(node)
1004
        continue
1005

    
1006
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1007
        inst = nv_dict.pop((node, lv_name), None)
1008
        if (not lv_online and inst is not None
1009
            and inst.name not in res_instances):
1010
          res_instances.append(inst.name)
1011

    
1012
    # any leftover items in nv_dict are missing LVs, let's arrange the
1013
    # data better
1014
    for key, inst in nv_dict.iteritems():
1015
      if inst.name not in res_missing:
1016
        res_missing[inst.name] = []
1017
      res_missing[inst.name].append(key)
1018

    
1019
    return result
1020

    
1021

    
1022
class LURenameCluster(LogicalUnit):
1023
  """Rename the cluster.
1024

1025
  """
1026
  HPATH = "cluster-rename"
1027
  HTYPE = constants.HTYPE_CLUSTER
1028
  _OP_REQP = ["name"]
1029

    
1030
  def BuildHooksEnv(self):
1031
    """Build hooks env.
1032

1033
    """
1034
    env = {
1035
      "OP_TARGET": self.cfg.GetClusterName(),
1036
      "NEW_NAME": self.op.name,
1037
      }
1038
    mn = self.cfg.GetMasterNode()
1039
    return env, [mn], [mn]
1040

    
1041
  def CheckPrereq(self):
1042
    """Verify that the passed name is a valid one.
1043

1044
    """
1045
    hostname = utils.HostInfo(self.op.name)
1046

    
1047
    new_name = hostname.name
1048
    self.ip = new_ip = hostname.ip
1049
    old_name = self.cfg.GetClusterName()
1050
    old_ip = self.cfg.GetMasterIP()
1051
    if new_name == old_name and new_ip == old_ip:
1052
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1053
                                 " cluster has changed")
1054
    if new_ip != old_ip:
1055
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1056
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1057
                                   " reachable on the network. Aborting." %
1058
                                   new_ip)
1059

    
1060
    self.op.name = new_name
1061

    
1062
  def Exec(self, feedback_fn):
1063
    """Rename the cluster.
1064

1065
    """
1066
    clustername = self.op.name
1067
    ip = self.ip
1068

    
1069
    # shutdown the master IP
1070
    master = self.cfg.GetMasterNode()
1071
    if not self.rpc.call_node_stop_master(master, False):
1072
      raise errors.OpExecError("Could not disable the master role")
1073

    
1074
    try:
1075
      # modify the sstore
1076
      # TODO: sstore
1077
      ss.SetKey(ss.SS_MASTER_IP, ip)
1078
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1079

    
1080
      # Distribute updated ss config to all nodes
1081
      myself = self.cfg.GetNodeInfo(master)
1082
      dist_nodes = self.cfg.GetNodeList()
1083
      if myself.name in dist_nodes:
1084
        dist_nodes.remove(myself.name)
1085

    
1086
      logger.Debug("Copying updated ssconf data to all nodes")
1087
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1088
        fname = ss.KeyToFilename(keyname)
1089
        result = self.rpc.call_upload_file(dist_nodes, fname)
1090
        for to_node in dist_nodes:
1091
          if not result[to_node]:
1092
            logger.Error("copy of file %s to node %s failed" %
1093
                         (fname, to_node))
1094
    finally:
1095
      if not self.rpc.call_node_start_master(master, False):
1096
        logger.Error("Could not re-enable the master role on the master,"
1097
                     " please restart manually.")
1098

    
1099

    
1100
def _RecursiveCheckIfLVMBased(disk):
1101
  """Check if the given disk or its children are lvm-based.
1102

1103
  Args:
1104
    disk: ganeti.objects.Disk object
1105

1106
  Returns:
1107
    boolean indicating whether a LD_LV dev_type was found or not
1108

1109
  """
1110
  if disk.children:
1111
    for chdisk in disk.children:
1112
      if _RecursiveCheckIfLVMBased(chdisk):
1113
        return True
1114
  return disk.dev_type == constants.LD_LV
1115

    
1116

    
1117
class LUSetClusterParams(LogicalUnit):
1118
  """Change the parameters of the cluster.
1119

1120
  """
1121
  HPATH = "cluster-modify"
1122
  HTYPE = constants.HTYPE_CLUSTER
1123
  _OP_REQP = []
1124
  REQ_BGL = False
1125

    
1126
  def ExpandNames(self):
1127
    # FIXME: in the future maybe other cluster params won't require checking on
1128
    # all nodes to be modified.
1129
    self.needed_locks = {
1130
      locking.LEVEL_NODE: locking.ALL_SET,
1131
    }
1132
    self.share_locks[locking.LEVEL_NODE] = 1
1133

    
1134
  def BuildHooksEnv(self):
1135
    """Build hooks env.
1136

1137
    """
1138
    env = {
1139
      "OP_TARGET": self.cfg.GetClusterName(),
1140
      "NEW_VG_NAME": self.op.vg_name,
1141
      }
1142
    mn = self.cfg.GetMasterNode()
1143
    return env, [mn], [mn]
1144

    
1145
  def CheckPrereq(self):
1146
    """Check prerequisites.
1147

1148
    This checks whether the given params don't conflict and
1149
    if the given volume group is valid.
1150

1151
    """
1152
    # FIXME: This only works because there is only one parameter that can be
1153
    # changed or removed.
1154
    if not self.op.vg_name:
1155
      instances = self.cfg.GetAllInstancesInfo().values()
1156
      for inst in instances:
1157
        for disk in inst.disks:
1158
          if _RecursiveCheckIfLVMBased(disk):
1159
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1160
                                       " lvm-based instances exist")
1161

    
1162
    # if vg_name not None, checks given volume group on all nodes
1163
    if self.op.vg_name:
1164
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1165
      vglist = self.rpc.call_vg_list(node_list)
1166
      for node in node_list:
1167
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1168
                                              constants.MIN_VG_SIZE)
1169
        if vgstatus:
1170
          raise errors.OpPrereqError("Error on node '%s': %s" %
1171
                                     (node, vgstatus))
1172

    
1173
  def Exec(self, feedback_fn):
1174
    """Change the parameters of the cluster.
1175

1176
    """
1177
    if self.op.vg_name != self.cfg.GetVGName():
1178
      self.cfg.SetVGName(self.op.vg_name)
1179
    else:
1180
      feedback_fn("Cluster LVM configuration already in desired"
1181
                  " state, not changing")
1182

    
1183

    
1184
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1185
  """Sleep and poll for an instance's disk to sync.
1186

1187
  """
1188
  if not instance.disks:
1189
    return True
1190

    
1191
  if not oneshot:
1192
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1193

    
1194
  node = instance.primary_node
1195

    
1196
  for dev in instance.disks:
1197
    lu.cfg.SetDiskID(dev, node)
1198

    
1199
  retries = 0
1200
  while True:
1201
    max_time = 0
1202
    done = True
1203
    cumul_degraded = False
1204
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1205
    if not rstats:
1206
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1207
      retries += 1
1208
      if retries >= 10:
1209
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1210
                                 " aborting." % node)
1211
      time.sleep(6)
1212
      continue
1213
    retries = 0
1214
    for i in range(len(rstats)):
1215
      mstat = rstats[i]
1216
      if mstat is None:
1217
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1218
                           (node, instance.disks[i].iv_name))
1219
        continue
1220
      # we ignore the ldisk parameter
1221
      perc_done, est_time, is_degraded, _ = mstat
1222
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1223
      if perc_done is not None:
1224
        done = False
1225
        if est_time is not None:
1226
          rem_time = "%d estimated seconds remaining" % est_time
1227
          max_time = est_time
1228
        else:
1229
          rem_time = "no time estimate"
1230
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1231
                        (instance.disks[i].iv_name, perc_done, rem_time))
1232
    if done or oneshot:
1233
      break
1234

    
1235
    time.sleep(min(60, max_time))
1236

    
1237
  if done:
1238
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1239
  return not cumul_degraded
1240

    
1241

    
1242
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1243
  """Check that mirrors are not degraded.
1244

1245
  The ldisk parameter, if True, will change the test from the
1246
  is_degraded attribute (which represents overall non-ok status for
1247
  the device(s)) to the ldisk (representing the local storage status).
1248

1249
  """
1250
  lu.cfg.SetDiskID(dev, node)
1251
  if ldisk:
1252
    idx = 6
1253
  else:
1254
    idx = 5
1255

    
1256
  result = True
1257
  if on_primary or dev.AssembleOnSecondary():
1258
    rstats = lu.rpc.call_blockdev_find(node, dev)
1259
    if not rstats:
1260
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1261
      result = False
1262
    else:
1263
      result = result and (not rstats[idx])
1264
  if dev.children:
1265
    for child in dev.children:
1266
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1267

    
1268
  return result
1269

    
1270

    
1271
class LUDiagnoseOS(NoHooksLU):
1272
  """Logical unit for OS diagnose/query.
1273

1274
  """
1275
  _OP_REQP = ["output_fields", "names"]
1276
  REQ_BGL = False
1277

    
1278
  def ExpandNames(self):
1279
    if self.op.names:
1280
      raise errors.OpPrereqError("Selective OS query not supported")
1281

    
1282
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1283
    _CheckOutputFields(static=[],
1284
                       dynamic=self.dynamic_fields,
1285
                       selected=self.op.output_fields)
1286

    
1287
    # Lock all nodes, in shared mode
1288
    self.needed_locks = {}
1289
    self.share_locks[locking.LEVEL_NODE] = 1
1290
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1291

    
1292
  def CheckPrereq(self):
1293
    """Check prerequisites.
1294

1295
    """
1296

    
1297
  @staticmethod
1298
  def _DiagnoseByOS(node_list, rlist):
1299
    """Remaps a per-node return list into an a per-os per-node dictionary
1300

1301
      Args:
1302
        node_list: a list with the names of all nodes
1303
        rlist: a map with node names as keys and OS objects as values
1304

1305
      Returns:
1306
        map: a map with osnames as keys and as value another map, with
1307
             nodes as
1308
             keys and list of OS objects as values
1309
             e.g. {"debian-etch": {"node1": [<object>,...],
1310
                                   "node2": [<object>,]}
1311
                  }
1312

1313
    """
1314
    all_os = {}
1315
    for node_name, nr in rlist.iteritems():
1316
      if not nr:
1317
        continue
1318
      for os_obj in nr:
1319
        if os_obj.name not in all_os:
1320
          # build a list of nodes for this os containing empty lists
1321
          # for each node in node_list
1322
          all_os[os_obj.name] = {}
1323
          for nname in node_list:
1324
            all_os[os_obj.name][nname] = []
1325
        all_os[os_obj.name][node_name].append(os_obj)
1326
    return all_os
1327

    
1328
  def Exec(self, feedback_fn):
1329
    """Compute the list of OSes.
1330

1331
    """
1332
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1333
    node_data = self.rpc.call_os_diagnose(node_list)
1334
    if node_data == False:
1335
      raise errors.OpExecError("Can't gather the list of OSes")
1336
    pol = self._DiagnoseByOS(node_list, node_data)
1337
    output = []
1338
    for os_name, os_data in pol.iteritems():
1339
      row = []
1340
      for field in self.op.output_fields:
1341
        if field == "name":
1342
          val = os_name
1343
        elif field == "valid":
1344
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1345
        elif field == "node_status":
1346
          val = {}
1347
          for node_name, nos_list in os_data.iteritems():
1348
            val[node_name] = [(v.status, v.path) for v in nos_list]
1349
        else:
1350
          raise errors.ParameterError(field)
1351
        row.append(val)
1352
      output.append(row)
1353

    
1354
    return output
1355

    
1356

    
1357
class LURemoveNode(LogicalUnit):
1358
  """Logical unit for removing a node.
1359

1360
  """
1361
  HPATH = "node-remove"
1362
  HTYPE = constants.HTYPE_NODE
1363
  _OP_REQP = ["node_name"]
1364

    
1365
  def BuildHooksEnv(self):
1366
    """Build hooks env.
1367

1368
    This doesn't run on the target node in the pre phase as a failed
1369
    node would then be impossible to remove.
1370

1371
    """
1372
    env = {
1373
      "OP_TARGET": self.op.node_name,
1374
      "NODE_NAME": self.op.node_name,
1375
      }
1376
    all_nodes = self.cfg.GetNodeList()
1377
    all_nodes.remove(self.op.node_name)
1378
    return env, all_nodes, all_nodes
1379

    
1380
  def CheckPrereq(self):
1381
    """Check prerequisites.
1382

1383
    This checks:
1384
     - the node exists in the configuration
1385
     - it does not have primary or secondary instances
1386
     - it's not the master
1387

1388
    Any errors are signalled by raising errors.OpPrereqError.
1389

1390
    """
1391
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1392
    if node is None:
1393
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1394

    
1395
    instance_list = self.cfg.GetInstanceList()
1396

    
1397
    masternode = self.cfg.GetMasterNode()
1398
    if node.name == masternode:
1399
      raise errors.OpPrereqError("Node is the master node,"
1400
                                 " you need to failover first.")
1401

    
1402
    for instance_name in instance_list:
1403
      instance = self.cfg.GetInstanceInfo(instance_name)
1404
      if node.name == instance.primary_node:
1405
        raise errors.OpPrereqError("Instance %s still running on the node,"
1406
                                   " please remove first." % instance_name)
1407
      if node.name in instance.secondary_nodes:
1408
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1409
                                   " please remove first." % instance_name)
1410
    self.op.node_name = node.name
1411
    self.node = node
1412

    
1413
  def Exec(self, feedback_fn):
1414
    """Removes the node from the cluster.
1415

1416
    """
1417
    node = self.node
1418
    logger.Info("stopping the node daemon and removing configs from node %s" %
1419
                node.name)
1420

    
1421
    self.context.RemoveNode(node.name)
1422

    
1423
    self.rpc.call_node_leave_cluster(node.name)
1424

    
1425

    
1426
class LUQueryNodes(NoHooksLU):
1427
  """Logical unit for querying nodes.
1428

1429
  """
1430
  _OP_REQP = ["output_fields", "names"]
1431
  REQ_BGL = False
1432

    
1433
  def ExpandNames(self):
1434
    self.dynamic_fields = frozenset([
1435
      "dtotal", "dfree",
1436
      "mtotal", "mnode", "mfree",
1437
      "bootid",
1438
      "ctotal",
1439
      ])
1440

    
1441
    self.static_fields = frozenset([
1442
      "name", "pinst_cnt", "sinst_cnt",
1443
      "pinst_list", "sinst_list",
1444
      "pip", "sip", "tags",
1445
      "serial_no",
1446
      ])
1447

    
1448
    _CheckOutputFields(static=self.static_fields,
1449
                       dynamic=self.dynamic_fields,
1450
                       selected=self.op.output_fields)
1451

    
1452
    self.needed_locks = {}
1453
    self.share_locks[locking.LEVEL_NODE] = 1
1454

    
1455
    if self.op.names:
1456
      self.wanted = _GetWantedNodes(self, self.op.names)
1457
    else:
1458
      self.wanted = locking.ALL_SET
1459

    
1460
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1461
    if self.do_locking:
1462
      # if we don't request only static fields, we need to lock the nodes
1463
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1464

    
1465

    
1466
  def CheckPrereq(self):
1467
    """Check prerequisites.
1468

1469
    """
1470
    # The validation of the node list is done in the _GetWantedNodes,
1471
    # if non empty, and if empty, there's no validation to do
1472
    pass
1473

    
1474
  def Exec(self, feedback_fn):
1475
    """Computes the list of nodes and their attributes.
1476

1477
    """
1478
    all_info = self.cfg.GetAllNodesInfo()
1479
    if self.do_locking:
1480
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1481
    elif self.wanted != locking.ALL_SET:
1482
      nodenames = self.wanted
1483
      missing = set(nodenames).difference(all_info.keys())
1484
      if missing:
1485
        raise errors.OpExecError(
1486
          "Some nodes were removed before retrieving their data: %s" % missing)
1487
    else:
1488
      nodenames = all_info.keys()
1489

    
1490
    nodenames = utils.NiceSort(nodenames)
1491
    nodelist = [all_info[name] for name in nodenames]
1492

    
1493
    # begin data gathering
1494

    
1495
    if self.dynamic_fields.intersection(self.op.output_fields):
1496
      live_data = {}
1497
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1498
                                          self.cfg.GetHypervisorType())
1499
      for name in nodenames:
1500
        nodeinfo = node_data.get(name, None)
1501
        if nodeinfo:
1502
          live_data[name] = {
1503
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1504
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1505
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1506
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1507
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1508
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1509
            "bootid": nodeinfo['bootid'],
1510
            }
1511
        else:
1512
          live_data[name] = {}
1513
    else:
1514
      live_data = dict.fromkeys(nodenames, {})
1515

    
1516
    node_to_primary = dict([(name, set()) for name in nodenames])
1517
    node_to_secondary = dict([(name, set()) for name in nodenames])
1518

    
1519
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1520
                             "sinst_cnt", "sinst_list"))
1521
    if inst_fields & frozenset(self.op.output_fields):
1522
      instancelist = self.cfg.GetInstanceList()
1523

    
1524
      for instance_name in instancelist:
1525
        inst = self.cfg.GetInstanceInfo(instance_name)
1526
        if inst.primary_node in node_to_primary:
1527
          node_to_primary[inst.primary_node].add(inst.name)
1528
        for secnode in inst.secondary_nodes:
1529
          if secnode in node_to_secondary:
1530
            node_to_secondary[secnode].add(inst.name)
1531

    
1532
    # end data gathering
1533

    
1534
    output = []
1535
    for node in nodelist:
1536
      node_output = []
1537
      for field in self.op.output_fields:
1538
        if field == "name":
1539
          val = node.name
1540
        elif field == "pinst_list":
1541
          val = list(node_to_primary[node.name])
1542
        elif field == "sinst_list":
1543
          val = list(node_to_secondary[node.name])
1544
        elif field == "pinst_cnt":
1545
          val = len(node_to_primary[node.name])
1546
        elif field == "sinst_cnt":
1547
          val = len(node_to_secondary[node.name])
1548
        elif field == "pip":
1549
          val = node.primary_ip
1550
        elif field == "sip":
1551
          val = node.secondary_ip
1552
        elif field == "tags":
1553
          val = list(node.GetTags())
1554
        elif field == "serial_no":
1555
          val = node.serial_no
1556
        elif field in self.dynamic_fields:
1557
          val = live_data[node.name].get(field, None)
1558
        else:
1559
          raise errors.ParameterError(field)
1560
        node_output.append(val)
1561
      output.append(node_output)
1562

    
1563
    return output
1564

    
1565

    
1566
class LUQueryNodeVolumes(NoHooksLU):
1567
  """Logical unit for getting volumes on node(s).
1568

1569
  """
1570
  _OP_REQP = ["nodes", "output_fields"]
1571
  REQ_BGL = False
1572

    
1573
  def ExpandNames(self):
1574
    _CheckOutputFields(static=["node"],
1575
                       dynamic=["phys", "vg", "name", "size", "instance"],
1576
                       selected=self.op.output_fields)
1577

    
1578
    self.needed_locks = {}
1579
    self.share_locks[locking.LEVEL_NODE] = 1
1580
    if not self.op.nodes:
1581
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1582
    else:
1583
      self.needed_locks[locking.LEVEL_NODE] = \
1584
        _GetWantedNodes(self, self.op.nodes)
1585

    
1586
  def CheckPrereq(self):
1587
    """Check prerequisites.
1588

1589
    This checks that the fields required are valid output fields.
1590

1591
    """
1592
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1593

    
1594
  def Exec(self, feedback_fn):
1595
    """Computes the list of nodes and their attributes.
1596

1597
    """
1598
    nodenames = self.nodes
1599
    volumes = self.rpc.call_node_volumes(nodenames)
1600

    
1601
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1602
             in self.cfg.GetInstanceList()]
1603

    
1604
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1605

    
1606
    output = []
1607
    for node in nodenames:
1608
      if node not in volumes or not volumes[node]:
1609
        continue
1610

    
1611
      node_vols = volumes[node][:]
1612
      node_vols.sort(key=lambda vol: vol['dev'])
1613

    
1614
      for vol in node_vols:
1615
        node_output = []
1616
        for field in self.op.output_fields:
1617
          if field == "node":
1618
            val = node
1619
          elif field == "phys":
1620
            val = vol['dev']
1621
          elif field == "vg":
1622
            val = vol['vg']
1623
          elif field == "name":
1624
            val = vol['name']
1625
          elif field == "size":
1626
            val = int(float(vol['size']))
1627
          elif field == "instance":
1628
            for inst in ilist:
1629
              if node not in lv_by_node[inst]:
1630
                continue
1631
              if vol['name'] in lv_by_node[inst][node]:
1632
                val = inst.name
1633
                break
1634
            else:
1635
              val = '-'
1636
          else:
1637
            raise errors.ParameterError(field)
1638
          node_output.append(str(val))
1639

    
1640
        output.append(node_output)
1641

    
1642
    return output
1643

    
1644

    
1645
class LUAddNode(LogicalUnit):
1646
  """Logical unit for adding node to the cluster.
1647

1648
  """
1649
  HPATH = "node-add"
1650
  HTYPE = constants.HTYPE_NODE
1651
  _OP_REQP = ["node_name"]
1652

    
1653
  def BuildHooksEnv(self):
1654
    """Build hooks env.
1655

1656
    This will run on all nodes before, and on all nodes + the new node after.
1657

1658
    """
1659
    env = {
1660
      "OP_TARGET": self.op.node_name,
1661
      "NODE_NAME": self.op.node_name,
1662
      "NODE_PIP": self.op.primary_ip,
1663
      "NODE_SIP": self.op.secondary_ip,
1664
      }
1665
    nodes_0 = self.cfg.GetNodeList()
1666
    nodes_1 = nodes_0 + [self.op.node_name, ]
1667
    return env, nodes_0, nodes_1
1668

    
1669
  def CheckPrereq(self):
1670
    """Check prerequisites.
1671

1672
    This checks:
1673
     - the new node is not already in the config
1674
     - it is resolvable
1675
     - its parameters (single/dual homed) matches the cluster
1676

1677
    Any errors are signalled by raising errors.OpPrereqError.
1678

1679
    """
1680
    node_name = self.op.node_name
1681
    cfg = self.cfg
1682

    
1683
    dns_data = utils.HostInfo(node_name)
1684

    
1685
    node = dns_data.name
1686
    primary_ip = self.op.primary_ip = dns_data.ip
1687
    secondary_ip = getattr(self.op, "secondary_ip", None)
1688
    if secondary_ip is None:
1689
      secondary_ip = primary_ip
1690
    if not utils.IsValidIP(secondary_ip):
1691
      raise errors.OpPrereqError("Invalid secondary IP given")
1692
    self.op.secondary_ip = secondary_ip
1693

    
1694
    node_list = cfg.GetNodeList()
1695
    if not self.op.readd and node in node_list:
1696
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1697
                                 node)
1698
    elif self.op.readd and node not in node_list:
1699
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1700

    
1701
    for existing_node_name in node_list:
1702
      existing_node = cfg.GetNodeInfo(existing_node_name)
1703

    
1704
      if self.op.readd and node == existing_node_name:
1705
        if (existing_node.primary_ip != primary_ip or
1706
            existing_node.secondary_ip != secondary_ip):
1707
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1708
                                     " address configuration as before")
1709
        continue
1710

    
1711
      if (existing_node.primary_ip == primary_ip or
1712
          existing_node.secondary_ip == primary_ip or
1713
          existing_node.primary_ip == secondary_ip or
1714
          existing_node.secondary_ip == secondary_ip):
1715
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1716
                                   " existing node %s" % existing_node.name)
1717

    
1718
    # check that the type of the node (single versus dual homed) is the
1719
    # same as for the master
1720
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1721
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1722
    newbie_singlehomed = secondary_ip == primary_ip
1723
    if master_singlehomed != newbie_singlehomed:
1724
      if master_singlehomed:
1725
        raise errors.OpPrereqError("The master has no private ip but the"
1726
                                   " new node has one")
1727
      else:
1728
        raise errors.OpPrereqError("The master has a private ip but the"
1729
                                   " new node doesn't have one")
1730

    
1731
    # checks reachablity
1732
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1733
      raise errors.OpPrereqError("Node not reachable by ping")
1734

    
1735
    if not newbie_singlehomed:
1736
      # check reachability from my secondary ip to newbie's secondary ip
1737
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1738
                           source=myself.secondary_ip):
1739
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1740
                                   " based ping to noded port")
1741

    
1742
    self.new_node = objects.Node(name=node,
1743
                                 primary_ip=primary_ip,
1744
                                 secondary_ip=secondary_ip)
1745

    
1746
  def Exec(self, feedback_fn):
1747
    """Adds the new node to the cluster.
1748

1749
    """
1750
    new_node = self.new_node
1751
    node = new_node.name
1752

    
1753
    # check connectivity
1754
    result = self.rpc.call_version([node])[node]
1755
    if result:
1756
      if constants.PROTOCOL_VERSION == result:
1757
        logger.Info("communication to node %s fine, sw version %s match" %
1758
                    (node, result))
1759
      else:
1760
        raise errors.OpExecError("Version mismatch master version %s,"
1761
                                 " node version %s" %
1762
                                 (constants.PROTOCOL_VERSION, result))
1763
    else:
1764
      raise errors.OpExecError("Cannot get version from the new node")
1765

    
1766
    # setup ssh on node
1767
    logger.Info("copy ssh key to node %s" % node)
1768
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1769
    keyarray = []
1770
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1771
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1772
                priv_key, pub_key]
1773

    
1774
    for i in keyfiles:
1775
      f = open(i, 'r')
1776
      try:
1777
        keyarray.append(f.read())
1778
      finally:
1779
        f.close()
1780

    
1781
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1782
                                    keyarray[2],
1783
                                    keyarray[3], keyarray[4], keyarray[5])
1784

    
1785
    if not result:
1786
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1787

    
1788
    # Add node to our /etc/hosts, and add key to known_hosts
1789
    utils.AddHostToEtcHosts(new_node.name)
1790

    
1791
    if new_node.secondary_ip != new_node.primary_ip:
1792
      if not self.rpc.call_node_has_ip_address(new_node.name,
1793
                                               new_node.secondary_ip):
1794
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1795
                                 " you gave (%s). Please fix and re-run this"
1796
                                 " command." % new_node.secondary_ip)
1797

    
1798
    node_verify_list = [self.cfg.GetMasterNode()]
1799
    node_verify_param = {
1800
      'nodelist': [node],
1801
      # TODO: do a node-net-test as well?
1802
    }
1803

    
1804
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1805
                                       self.cfg.GetClusterName())
1806
    for verifier in node_verify_list:
1807
      if not result[verifier]:
1808
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1809
                                 " for remote verification" % verifier)
1810
      if result[verifier]['nodelist']:
1811
        for failed in result[verifier]['nodelist']:
1812
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1813
                      (verifier, result[verifier]['nodelist'][failed]))
1814
        raise errors.OpExecError("ssh/hostname verification failed.")
1815

    
1816
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1817
    # including the node just added
1818
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1819
    dist_nodes = self.cfg.GetNodeList()
1820
    if not self.op.readd:
1821
      dist_nodes.append(node)
1822
    if myself.name in dist_nodes:
1823
      dist_nodes.remove(myself.name)
1824

    
1825
    logger.Debug("Copying hosts and known_hosts to all nodes")
1826
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1827
      result = self.rpc.call_upload_file(dist_nodes, fname)
1828
      for to_node in dist_nodes:
1829
        if not result[to_node]:
1830
          logger.Error("copy of file %s to node %s failed" %
1831
                       (fname, to_node))
1832

    
1833
    to_copy = []
1834
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1835
      to_copy.append(constants.VNC_PASSWORD_FILE)
1836
    for fname in to_copy:
1837
      result = self.rpc.call_upload_file([node], fname)
1838
      if not result[node]:
1839
        logger.Error("could not copy file %s to node %s" % (fname, node))
1840

    
1841
    if self.op.readd:
1842
      self.context.ReaddNode(new_node)
1843
    else:
1844
      self.context.AddNode(new_node)
1845

    
1846

    
1847
class LUQueryClusterInfo(NoHooksLU):
1848
  """Query cluster configuration.
1849

1850
  """
1851
  _OP_REQP = []
1852
  REQ_MASTER = False
1853
  REQ_BGL = False
1854

    
1855
  def ExpandNames(self):
1856
    self.needed_locks = {}
1857

    
1858
  def CheckPrereq(self):
1859
    """No prerequsites needed for this LU.
1860

1861
    """
1862
    pass
1863

    
1864
  def Exec(self, feedback_fn):
1865
    """Return cluster config.
1866

1867
    """
1868
    result = {
1869
      "name": self.cfg.GetClusterName(),
1870
      "software_version": constants.RELEASE_VERSION,
1871
      "protocol_version": constants.PROTOCOL_VERSION,
1872
      "config_version": constants.CONFIG_VERSION,
1873
      "os_api_version": constants.OS_API_VERSION,
1874
      "export_version": constants.EXPORT_VERSION,
1875
      "master": self.cfg.GetMasterNode(),
1876
      "architecture": (platform.architecture()[0], platform.machine()),
1877
      "hypervisor_type": self.cfg.GetHypervisorType(),
1878
      "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1879
      }
1880

    
1881
    return result
1882

    
1883

    
1884
class LUQueryConfigValues(NoHooksLU):
1885
  """Return configuration values.
1886

1887
  """
1888
  _OP_REQP = []
1889
  REQ_BGL = False
1890

    
1891
  def ExpandNames(self):
1892
    self.needed_locks = {}
1893

    
1894
    static_fields = ["cluster_name", "master_node", "drain_flag"]
1895
    _CheckOutputFields(static=static_fields,
1896
                       dynamic=[],
1897
                       selected=self.op.output_fields)
1898

    
1899
  def CheckPrereq(self):
1900
    """No prerequisites.
1901

1902
    """
1903
    pass
1904

    
1905
  def Exec(self, feedback_fn):
1906
    """Dump a representation of the cluster config to the standard output.
1907

1908
    """
1909
    values = []
1910
    for field in self.op.output_fields:
1911
      if field == "cluster_name":
1912
        entry = self.cfg.GetClusterName()
1913
      elif field == "master_node":
1914
        entry = self.cfg.GetMasterNode()
1915
      elif field == "drain_flag":
1916
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1917
      else:
1918
        raise errors.ParameterError(field)
1919
      values.append(entry)
1920
    return values
1921

    
1922

    
1923
class LUActivateInstanceDisks(NoHooksLU):
1924
  """Bring up an instance's disks.
1925

1926
  """
1927
  _OP_REQP = ["instance_name"]
1928
  REQ_BGL = False
1929

    
1930
  def ExpandNames(self):
1931
    self._ExpandAndLockInstance()
1932
    self.needed_locks[locking.LEVEL_NODE] = []
1933
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1934

    
1935
  def DeclareLocks(self, level):
1936
    if level == locking.LEVEL_NODE:
1937
      self._LockInstancesNodes()
1938

    
1939
  def CheckPrereq(self):
1940
    """Check prerequisites.
1941

1942
    This checks that the instance is in the cluster.
1943

1944
    """
1945
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1946
    assert self.instance is not None, \
1947
      "Cannot retrieve locked instance %s" % self.op.instance_name
1948

    
1949
  def Exec(self, feedback_fn):
1950
    """Activate the disks.
1951

1952
    """
1953
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1954
    if not disks_ok:
1955
      raise errors.OpExecError("Cannot activate block devices")
1956

    
1957
    return disks_info
1958

    
1959

    
1960
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1961
  """Prepare the block devices for an instance.
1962

1963
  This sets up the block devices on all nodes.
1964

1965
  Args:
1966
    instance: a ganeti.objects.Instance object
1967
    ignore_secondaries: if true, errors on secondary nodes won't result
1968
                        in an error return from the function
1969

1970
  Returns:
1971
    false if the operation failed
1972
    list of (host, instance_visible_name, node_visible_name) if the operation
1973
         suceeded with the mapping from node devices to instance devices
1974
  """
1975
  device_info = []
1976
  disks_ok = True
1977
  iname = instance.name
1978
  # With the two passes mechanism we try to reduce the window of
1979
  # opportunity for the race condition of switching DRBD to primary
1980
  # before handshaking occured, but we do not eliminate it
1981

    
1982
  # The proper fix would be to wait (with some limits) until the
1983
  # connection has been made and drbd transitions from WFConnection
1984
  # into any other network-connected state (Connected, SyncTarget,
1985
  # SyncSource, etc.)
1986

    
1987
  # 1st pass, assemble on all nodes in secondary mode
1988
  for inst_disk in instance.disks:
1989
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1990
      lu.cfg.SetDiskID(node_disk, node)
1991
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1992
      if not result:
1993
        logger.Error("could not prepare block device %s on node %s"
1994
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1995
        if not ignore_secondaries:
1996
          disks_ok = False
1997

    
1998
  # FIXME: race condition on drbd migration to primary
1999

    
2000
  # 2nd pass, do only the primary node
2001
  for inst_disk in instance.disks:
2002
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2003
      if node != instance.primary_node:
2004
        continue
2005
      lu.cfg.SetDiskID(node_disk, node)
2006
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2007
      if not result:
2008
        logger.Error("could not prepare block device %s on node %s"
2009
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2010
        disks_ok = False
2011
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
2012

    
2013
  # leave the disks configured for the primary node
2014
  # this is a workaround that would be fixed better by
2015
  # improving the logical/physical id handling
2016
  for disk in instance.disks:
2017
    lu.cfg.SetDiskID(disk, instance.primary_node)
2018

    
2019
  return disks_ok, device_info
2020

    
2021

    
2022
def _StartInstanceDisks(lu, instance, force):
2023
  """Start the disks of an instance.
2024

2025
  """
2026
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2027
                                           ignore_secondaries=force)
2028
  if not disks_ok:
2029
    _ShutdownInstanceDisks(lu, instance)
2030
    if force is not None and not force:
2031
      logger.Error("If the message above refers to a secondary node,"
2032
                   " you can retry the operation using '--force'.")
2033
    raise errors.OpExecError("Disk consistency error")
2034

    
2035

    
2036
class LUDeactivateInstanceDisks(NoHooksLU):
2037
  """Shutdown an instance's disks.
2038

2039
  """
2040
  _OP_REQP = ["instance_name"]
2041
  REQ_BGL = False
2042

    
2043
  def ExpandNames(self):
2044
    self._ExpandAndLockInstance()
2045
    self.needed_locks[locking.LEVEL_NODE] = []
2046
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2047

    
2048
  def DeclareLocks(self, level):
2049
    if level == locking.LEVEL_NODE:
2050
      self._LockInstancesNodes()
2051

    
2052
  def CheckPrereq(self):
2053
    """Check prerequisites.
2054

2055
    This checks that the instance is in the cluster.
2056

2057
    """
2058
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2059
    assert self.instance is not None, \
2060
      "Cannot retrieve locked instance %s" % self.op.instance_name
2061

    
2062
  def Exec(self, feedback_fn):
2063
    """Deactivate the disks
2064

2065
    """
2066
    instance = self.instance
2067
    _SafeShutdownInstanceDisks(self, instance)
2068

    
2069

    
2070
def _SafeShutdownInstanceDisks(lu, instance):
2071
  """Shutdown block devices of an instance.
2072

2073
  This function checks if an instance is running, before calling
2074
  _ShutdownInstanceDisks.
2075

2076
  """
2077
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2078
                                      [instance.hypervisor])
2079
  ins_l = ins_l[instance.primary_node]
2080
  if not type(ins_l) is list:
2081
    raise errors.OpExecError("Can't contact node '%s'" %
2082
                             instance.primary_node)
2083

    
2084
  if instance.name in ins_l:
2085
    raise errors.OpExecError("Instance is running, can't shutdown"
2086
                             " block devices.")
2087

    
2088
  _ShutdownInstanceDisks(lu, instance)
2089

    
2090

    
2091
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2092
  """Shutdown block devices of an instance.
2093

2094
  This does the shutdown on all nodes of the instance.
2095

2096
  If the ignore_primary is false, errors on the primary node are
2097
  ignored.
2098

2099
  """
2100
  result = True
2101
  for disk in instance.disks:
2102
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2103
      lu.cfg.SetDiskID(top_disk, node)
2104
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2105
        logger.Error("could not shutdown block device %s on node %s" %
2106
                     (disk.iv_name, node))
2107
        if not ignore_primary or node != instance.primary_node:
2108
          result = False
2109
  return result
2110

    
2111

    
2112
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2113
  """Checks if a node has enough free memory.
2114

2115
  This function check if a given node has the needed amount of free
2116
  memory. In case the node has less memory or we cannot get the
2117
  information from the node, this function raise an OpPrereqError
2118
  exception.
2119

2120
  @type lu: C{LogicalUnit}
2121
  @param lu: a logical unit from which we get configuration data
2122
  @type node: C{str}
2123
  @param node: the node to check
2124
  @type reason: C{str}
2125
  @param reason: string to use in the error message
2126
  @type requested: C{int}
2127
  @param requested: the amount of memory in MiB to check for
2128
  @type hypervisor: C{str}
2129
  @param hypervisor: the hypervisor to ask for memory stats
2130
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2131
      we cannot check the node
2132

2133
  """
2134
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2135
  if not nodeinfo or not isinstance(nodeinfo, dict):
2136
    raise errors.OpPrereqError("Could not contact node %s for resource"
2137
                             " information" % (node,))
2138

    
2139
  free_mem = nodeinfo[node].get('memory_free')
2140
  if not isinstance(free_mem, int):
2141
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2142
                             " was '%s'" % (node, free_mem))
2143
  if requested > free_mem:
2144
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2145
                             " needed %s MiB, available %s MiB" %
2146
                             (node, reason, requested, free_mem))
2147

    
2148

    
2149
class LUStartupInstance(LogicalUnit):
2150
  """Starts an instance.
2151

2152
  """
2153
  HPATH = "instance-start"
2154
  HTYPE = constants.HTYPE_INSTANCE
2155
  _OP_REQP = ["instance_name", "force"]
2156
  REQ_BGL = False
2157

    
2158
  def ExpandNames(self):
2159
    self._ExpandAndLockInstance()
2160
    self.needed_locks[locking.LEVEL_NODE] = []
2161
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2162

    
2163
  def DeclareLocks(self, level):
2164
    if level == locking.LEVEL_NODE:
2165
      self._LockInstancesNodes()
2166

    
2167
  def BuildHooksEnv(self):
2168
    """Build hooks env.
2169

2170
    This runs on master, primary and secondary nodes of the instance.
2171

2172
    """
2173
    env = {
2174
      "FORCE": self.op.force,
2175
      }
2176
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2177
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2178
          list(self.instance.secondary_nodes))
2179
    return env, nl, nl
2180

    
2181
  def CheckPrereq(self):
2182
    """Check prerequisites.
2183

2184
    This checks that the instance is in the cluster.
2185

2186
    """
2187
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2188
    assert self.instance is not None, \
2189
      "Cannot retrieve locked instance %s" % self.op.instance_name
2190

    
2191
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2192
    # check bridges existance
2193
    _CheckInstanceBridgesExist(self, instance)
2194

    
2195
    _CheckNodeFreeMemory(self, instance.primary_node,
2196
                         "starting instance %s" % instance.name,
2197
                         bep[constants.BE_MEMORY], instance.hypervisor)
2198

    
2199
  def Exec(self, feedback_fn):
2200
    """Start the instance.
2201

2202
    """
2203
    instance = self.instance
2204
    force = self.op.force
2205
    extra_args = getattr(self.op, "extra_args", "")
2206

    
2207
    self.cfg.MarkInstanceUp(instance.name)
2208

    
2209
    node_current = instance.primary_node
2210

    
2211
    _StartInstanceDisks(self, instance, force)
2212

    
2213
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2214
      _ShutdownInstanceDisks(self, instance)
2215
      raise errors.OpExecError("Could not start instance")
2216

    
2217

    
2218
class LURebootInstance(LogicalUnit):
2219
  """Reboot an instance.
2220

2221
  """
2222
  HPATH = "instance-reboot"
2223
  HTYPE = constants.HTYPE_INSTANCE
2224
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2225
  REQ_BGL = False
2226

    
2227
  def ExpandNames(self):
2228
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2229
                                   constants.INSTANCE_REBOOT_HARD,
2230
                                   constants.INSTANCE_REBOOT_FULL]:
2231
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2232
                                  (constants.INSTANCE_REBOOT_SOFT,
2233
                                   constants.INSTANCE_REBOOT_HARD,
2234
                                   constants.INSTANCE_REBOOT_FULL))
2235
    self._ExpandAndLockInstance()
2236
    self.needed_locks[locking.LEVEL_NODE] = []
2237
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2238

    
2239
  def DeclareLocks(self, level):
2240
    if level == locking.LEVEL_NODE:
2241
      primary_only = not constants.INSTANCE_REBOOT_FULL
2242
      self._LockInstancesNodes(primary_only=primary_only)
2243

    
2244
  def BuildHooksEnv(self):
2245
    """Build hooks env.
2246

2247
    This runs on master, primary and secondary nodes of the instance.
2248

2249
    """
2250
    env = {
2251
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2252
      }
2253
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2254
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2255
          list(self.instance.secondary_nodes))
2256
    return env, nl, nl
2257

    
2258
  def CheckPrereq(self):
2259
    """Check prerequisites.
2260

2261
    This checks that the instance is in the cluster.
2262

2263
    """
2264
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2265
    assert self.instance is not None, \
2266
      "Cannot retrieve locked instance %s" % self.op.instance_name
2267

    
2268
    # check bridges existance
2269
    _CheckInstanceBridgesExist(self, instance)
2270

    
2271
  def Exec(self, feedback_fn):
2272
    """Reboot the instance.
2273

2274
    """
2275
    instance = self.instance
2276
    ignore_secondaries = self.op.ignore_secondaries
2277
    reboot_type = self.op.reboot_type
2278
    extra_args = getattr(self.op, "extra_args", "")
2279

    
2280
    node_current = instance.primary_node
2281

    
2282
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2283
                       constants.INSTANCE_REBOOT_HARD]:
2284
      if not self.rpc.call_instance_reboot(node_current, instance,
2285
                                           reboot_type, extra_args):
2286
        raise errors.OpExecError("Could not reboot instance")
2287
    else:
2288
      if not self.rpc.call_instance_shutdown(node_current, instance):
2289
        raise errors.OpExecError("could not shutdown instance for full reboot")
2290
      _ShutdownInstanceDisks(self, instance)
2291
      _StartInstanceDisks(self, instance, ignore_secondaries)
2292
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2293
        _ShutdownInstanceDisks(self, instance)
2294
        raise errors.OpExecError("Could not start instance for full reboot")
2295

    
2296
    self.cfg.MarkInstanceUp(instance.name)
2297

    
2298

    
2299
class LUShutdownInstance(LogicalUnit):
2300
  """Shutdown an instance.
2301

2302
  """
2303
  HPATH = "instance-stop"
2304
  HTYPE = constants.HTYPE_INSTANCE
2305
  _OP_REQP = ["instance_name"]
2306
  REQ_BGL = False
2307

    
2308
  def ExpandNames(self):
2309
    self._ExpandAndLockInstance()
2310
    self.needed_locks[locking.LEVEL_NODE] = []
2311
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2312

    
2313
  def DeclareLocks(self, level):
2314
    if level == locking.LEVEL_NODE:
2315
      self._LockInstancesNodes()
2316

    
2317
  def BuildHooksEnv(self):
2318
    """Build hooks env.
2319

2320
    This runs on master, primary and secondary nodes of the instance.
2321

2322
    """
2323
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2324
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2325
          list(self.instance.secondary_nodes))
2326
    return env, nl, nl
2327

    
2328
  def CheckPrereq(self):
2329
    """Check prerequisites.
2330

2331
    This checks that the instance is in the cluster.
2332

2333
    """
2334
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2335
    assert self.instance is not None, \
2336
      "Cannot retrieve locked instance %s" % self.op.instance_name
2337

    
2338
  def Exec(self, feedback_fn):
2339
    """Shutdown the instance.
2340

2341
    """
2342
    instance = self.instance
2343
    node_current = instance.primary_node
2344
    self.cfg.MarkInstanceDown(instance.name)
2345
    if not self.rpc.call_instance_shutdown(node_current, instance):
2346
      logger.Error("could not shutdown instance")
2347

    
2348
    _ShutdownInstanceDisks(self, instance)
2349

    
2350

    
2351
class LUReinstallInstance(LogicalUnit):
2352
  """Reinstall an instance.
2353

2354
  """
2355
  HPATH = "instance-reinstall"
2356
  HTYPE = constants.HTYPE_INSTANCE
2357
  _OP_REQP = ["instance_name"]
2358
  REQ_BGL = False
2359

    
2360
  def ExpandNames(self):
2361
    self._ExpandAndLockInstance()
2362
    self.needed_locks[locking.LEVEL_NODE] = []
2363
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2364

    
2365
  def DeclareLocks(self, level):
2366
    if level == locking.LEVEL_NODE:
2367
      self._LockInstancesNodes()
2368

    
2369
  def BuildHooksEnv(self):
2370
    """Build hooks env.
2371

2372
    This runs on master, primary and secondary nodes of the instance.
2373

2374
    """
2375
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2376
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2377
          list(self.instance.secondary_nodes))
2378
    return env, nl, nl
2379

    
2380
  def CheckPrereq(self):
2381
    """Check prerequisites.
2382

2383
    This checks that the instance is in the cluster and is not running.
2384

2385
    """
2386
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2387
    assert instance is not None, \
2388
      "Cannot retrieve locked instance %s" % self.op.instance_name
2389

    
2390
    if instance.disk_template == constants.DT_DISKLESS:
2391
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2392
                                 self.op.instance_name)
2393
    if instance.status != "down":
2394
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2395
                                 self.op.instance_name)
2396
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2397
                                              instance.name,
2398
                                              instance.hypervisor)
2399
    if remote_info:
2400
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2401
                                 (self.op.instance_name,
2402
                                  instance.primary_node))
2403

    
2404
    self.op.os_type = getattr(self.op, "os_type", None)
2405
    if self.op.os_type is not None:
2406
      # OS verification
2407
      pnode = self.cfg.GetNodeInfo(
2408
        self.cfg.ExpandNodeName(instance.primary_node))
2409
      if pnode is None:
2410
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2411
                                   self.op.pnode)
2412
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2413
      if not os_obj:
2414
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2415
                                   " primary node"  % self.op.os_type)
2416

    
2417
    self.instance = instance
2418

    
2419
  def Exec(self, feedback_fn):
2420
    """Reinstall the instance.
2421

2422
    """
2423
    inst = self.instance
2424

    
2425
    if self.op.os_type is not None:
2426
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2427
      inst.os = self.op.os_type
2428
      self.cfg.Update(inst)
2429

    
2430
    _StartInstanceDisks(self, inst, None)
2431
    try:
2432
      feedback_fn("Running the instance OS create scripts...")
2433
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2434
                                           "sda", "sdb"):
2435
        raise errors.OpExecError("Could not install OS for instance %s"
2436
                                 " on node %s" %
2437
                                 (inst.name, inst.primary_node))
2438
    finally:
2439
      _ShutdownInstanceDisks(self, inst)
2440

    
2441

    
2442
class LURenameInstance(LogicalUnit):
2443
  """Rename an instance.
2444

2445
  """
2446
  HPATH = "instance-rename"
2447
  HTYPE = constants.HTYPE_INSTANCE
2448
  _OP_REQP = ["instance_name", "new_name"]
2449

    
2450
  def BuildHooksEnv(self):
2451
    """Build hooks env.
2452

2453
    This runs on master, primary and secondary nodes of the instance.
2454

2455
    """
2456
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2457
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2458
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2459
          list(self.instance.secondary_nodes))
2460
    return env, nl, nl
2461

    
2462
  def CheckPrereq(self):
2463
    """Check prerequisites.
2464

2465
    This checks that the instance is in the cluster and is not running.
2466

2467
    """
2468
    instance = self.cfg.GetInstanceInfo(
2469
      self.cfg.ExpandInstanceName(self.op.instance_name))
2470
    if instance is None:
2471
      raise errors.OpPrereqError("Instance '%s' not known" %
2472
                                 self.op.instance_name)
2473
    if instance.status != "down":
2474
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2475
                                 self.op.instance_name)
2476
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2477
                                              instance.name,
2478
                                              instance.hypervisor)
2479
    if remote_info:
2480
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2481
                                 (self.op.instance_name,
2482
                                  instance.primary_node))
2483
    self.instance = instance
2484

    
2485
    # new name verification
2486
    name_info = utils.HostInfo(self.op.new_name)
2487

    
2488
    self.op.new_name = new_name = name_info.name
2489
    instance_list = self.cfg.GetInstanceList()
2490
    if new_name in instance_list:
2491
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2492
                                 new_name)
2493

    
2494
    if not getattr(self.op, "ignore_ip", False):
2495
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2496
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2497
                                   (name_info.ip, new_name))
2498

    
2499

    
2500
  def Exec(self, feedback_fn):
2501
    """Reinstall the instance.
2502

2503
    """
2504
    inst = self.instance
2505
    old_name = inst.name
2506

    
2507
    if inst.disk_template == constants.DT_FILE:
2508
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2509

    
2510
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2511
    # Change the instance lock. This is definitely safe while we hold the BGL
2512
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2513
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2514

    
2515
    # re-read the instance from the configuration after rename
2516
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2517

    
2518
    if inst.disk_template == constants.DT_FILE:
2519
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2520
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2521
                                                     old_file_storage_dir,
2522
                                                     new_file_storage_dir)
2523

    
2524
      if not result:
2525
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2526
                                 " directory '%s' to '%s' (but the instance"
2527
                                 " has been renamed in Ganeti)" % (
2528
                                 inst.primary_node, old_file_storage_dir,
2529
                                 new_file_storage_dir))
2530

    
2531
      if not result[0]:
2532
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2533
                                 " (but the instance has been renamed in"
2534
                                 " Ganeti)" % (old_file_storage_dir,
2535
                                               new_file_storage_dir))
2536

    
2537
    _StartInstanceDisks(self, inst, None)
2538
    try:
2539
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2540
                                               old_name,
2541
                                               "sda", "sdb"):
2542
        msg = ("Could not run OS rename script for instance %s on node %s"
2543
               " (but the instance has been renamed in Ganeti)" %
2544
               (inst.name, inst.primary_node))
2545
        logger.Error(msg)
2546
    finally:
2547
      _ShutdownInstanceDisks(self, inst)
2548

    
2549

    
2550
class LURemoveInstance(LogicalUnit):
2551
  """Remove an instance.
2552

2553
  """
2554
  HPATH = "instance-remove"
2555
  HTYPE = constants.HTYPE_INSTANCE
2556
  _OP_REQP = ["instance_name", "ignore_failures"]
2557
  REQ_BGL = False
2558

    
2559
  def ExpandNames(self):
2560
    self._ExpandAndLockInstance()
2561
    self.needed_locks[locking.LEVEL_NODE] = []
2562
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2563

    
2564
  def DeclareLocks(self, level):
2565
    if level == locking.LEVEL_NODE:
2566
      self._LockInstancesNodes()
2567

    
2568
  def BuildHooksEnv(self):
2569
    """Build hooks env.
2570

2571
    This runs on master, primary and secondary nodes of the instance.
2572

2573
    """
2574
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2575
    nl = [self.cfg.GetMasterNode()]
2576
    return env, nl, nl
2577

    
2578
  def CheckPrereq(self):
2579
    """Check prerequisites.
2580

2581
    This checks that the instance is in the cluster.
2582

2583
    """
2584
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2585
    assert self.instance is not None, \
2586
      "Cannot retrieve locked instance %s" % self.op.instance_name
2587

    
2588
  def Exec(self, feedback_fn):
2589
    """Remove the instance.
2590

2591
    """
2592
    instance = self.instance
2593
    logger.Info("shutting down instance %s on node %s" %
2594
                (instance.name, instance.primary_node))
2595

    
2596
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2597
      if self.op.ignore_failures:
2598
        feedback_fn("Warning: can't shutdown instance")
2599
      else:
2600
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2601
                                 (instance.name, instance.primary_node))
2602

    
2603
    logger.Info("removing block devices for instance %s" % instance.name)
2604

    
2605
    if not _RemoveDisks(self, instance):
2606
      if self.op.ignore_failures:
2607
        feedback_fn("Warning: can't remove instance's disks")
2608
      else:
2609
        raise errors.OpExecError("Can't remove instance's disks")
2610

    
2611
    logger.Info("removing instance %s out of cluster config" % instance.name)
2612

    
2613
    self.cfg.RemoveInstance(instance.name)
2614
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2615

    
2616

    
2617
class LUQueryInstances(NoHooksLU):
2618
  """Logical unit for querying instances.
2619

2620
  """
2621
  _OP_REQP = ["output_fields", "names"]
2622
  REQ_BGL = False
2623

    
2624
  def ExpandNames(self):
2625
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2626
    hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2627
    bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2628
    self.static_fields = frozenset([
2629
      "name", "os", "pnode", "snodes",
2630
      "admin_state", "admin_ram",
2631
      "disk_template", "ip", "mac", "bridge",
2632
      "sda_size", "sdb_size", "vcpus", "tags",
2633
      "network_port",
2634
      "serial_no", "hypervisor", "hvparams",
2635
      ] + hvp + bep)
2636

    
2637
    _CheckOutputFields(static=self.static_fields,
2638
                       dynamic=self.dynamic_fields,
2639
                       selected=self.op.output_fields)
2640

    
2641
    self.needed_locks = {}
2642
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2643
    self.share_locks[locking.LEVEL_NODE] = 1
2644

    
2645
    if self.op.names:
2646
      self.wanted = _GetWantedInstances(self, self.op.names)
2647
    else:
2648
      self.wanted = locking.ALL_SET
2649

    
2650
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2651
    if self.do_locking:
2652
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2653
      self.needed_locks[locking.LEVEL_NODE] = []
2654
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2655

    
2656
  def DeclareLocks(self, level):
2657
    if level == locking.LEVEL_NODE and self.do_locking:
2658
      self._LockInstancesNodes()
2659

    
2660
  def CheckPrereq(self):
2661
    """Check prerequisites.
2662

2663
    """
2664
    pass
2665

    
2666
  def Exec(self, feedback_fn):
2667
    """Computes the list of nodes and their attributes.
2668

2669
    """
2670
    all_info = self.cfg.GetAllInstancesInfo()
2671
    if self.do_locking:
2672
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2673
    elif self.wanted != locking.ALL_SET:
2674
      instance_names = self.wanted
2675
      missing = set(instance_names).difference(all_info.keys())
2676
      if missing:
2677
        raise errors.OpExecError(
2678
          "Some instances were removed before retrieving their data: %s"
2679
          % missing)
2680
    else:
2681
      instance_names = all_info.keys()
2682

    
2683
    instance_names = utils.NiceSort(instance_names)
2684
    instance_list = [all_info[iname] for iname in instance_names]
2685

    
2686
    # begin data gathering
2687

    
2688
    nodes = frozenset([inst.primary_node for inst in instance_list])
2689
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2690

    
2691
    bad_nodes = []
2692
    if self.dynamic_fields.intersection(self.op.output_fields):
2693
      live_data = {}
2694
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2695
      for name in nodes:
2696
        result = node_data[name]
2697
        if result:
2698
          live_data.update(result)
2699
        elif result == False:
2700
          bad_nodes.append(name)
2701
        # else no instance is alive
2702
    else:
2703
      live_data = dict([(name, {}) for name in instance_names])
2704

    
2705
    # end data gathering
2706

    
2707
    HVPREFIX = "hv/"
2708
    BEPREFIX = "be/"
2709
    output = []
2710
    for instance in instance_list:
2711
      iout = []
2712
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2713
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
2714
      for field in self.op.output_fields:
2715
        if field == "name":
2716
          val = instance.name
2717
        elif field == "os":
2718
          val = instance.os
2719
        elif field == "pnode":
2720
          val = instance.primary_node
2721
        elif field == "snodes":
2722
          val = list(instance.secondary_nodes)
2723
        elif field == "admin_state":
2724
          val = (instance.status != "down")
2725
        elif field == "oper_state":
2726
          if instance.primary_node in bad_nodes:
2727
            val = None
2728
          else:
2729
            val = bool(live_data.get(instance.name))
2730
        elif field == "status":
2731
          if instance.primary_node in bad_nodes:
2732
            val = "ERROR_nodedown"
2733
          else:
2734
            running = bool(live_data.get(instance.name))
2735
            if running:
2736
              if instance.status != "down":
2737
                val = "running"
2738
              else:
2739
                val = "ERROR_up"
2740
            else:
2741
              if instance.status != "down":
2742
                val = "ERROR_down"
2743
              else:
2744
                val = "ADMIN_down"
2745
        elif field == "oper_ram":
2746
          if instance.primary_node in bad_nodes:
2747
            val = None
2748
          elif instance.name in live_data:
2749
            val = live_data[instance.name].get("memory", "?")
2750
          else:
2751
            val = "-"
2752
        elif field == "disk_template":
2753
          val = instance.disk_template
2754
        elif field == "ip":
2755
          val = instance.nics[0].ip
2756
        elif field == "bridge":
2757
          val = instance.nics[0].bridge
2758
        elif field == "mac":
2759
          val = instance.nics[0].mac
2760
        elif field == "sda_size" or field == "sdb_size":
2761
          disk = instance.FindDisk(field[:3])
2762
          if disk is None:
2763
            val = None
2764
          else:
2765
            val = disk.size
2766
        elif field == "tags":
2767
          val = list(instance.GetTags())
2768
        elif field == "serial_no":
2769
          val = instance.serial_no
2770
        elif field == "network_port":
2771
          val = instance.network_port
2772
        elif field == "hypervisor":
2773
          val = instance.hypervisor
2774
        elif field == "hvparams":
2775
          val = i_hv
2776
        elif (field.startswith(HVPREFIX) and
2777
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2778
          val = i_hv.get(field[len(HVPREFIX):], None)
2779
        elif field == "beparams":
2780
          val = i_be
2781
        elif (field.startswith(BEPREFIX) and
2782
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2783
          val = i_be.get(field[len(BEPREFIX):], None)
2784
        else:
2785
          raise errors.ParameterError(field)
2786
        iout.append(val)
2787
      output.append(iout)
2788

    
2789
    return output
2790

    
2791

    
2792
class LUFailoverInstance(LogicalUnit):
2793
  """Failover an instance.
2794

2795
  """
2796
  HPATH = "instance-failover"
2797
  HTYPE = constants.HTYPE_INSTANCE
2798
  _OP_REQP = ["instance_name", "ignore_consistency"]
2799
  REQ_BGL = False
2800

    
2801
  def ExpandNames(self):
2802
    self._ExpandAndLockInstance()
2803
    self.needed_locks[locking.LEVEL_NODE] = []
2804
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2805

    
2806
  def DeclareLocks(self, level):
2807
    if level == locking.LEVEL_NODE:
2808
      self._LockInstancesNodes()
2809

    
2810
  def BuildHooksEnv(self):
2811
    """Build hooks env.
2812

2813
    This runs on master, primary and secondary nodes of the instance.
2814

2815
    """
2816
    env = {
2817
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2818
      }
2819
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2820
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2821
    return env, nl, nl
2822

    
2823
  def CheckPrereq(self):
2824
    """Check prerequisites.
2825

2826
    This checks that the instance is in the cluster.
2827

2828
    """
2829
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2830
    assert self.instance is not None, \
2831
      "Cannot retrieve locked instance %s" % self.op.instance_name
2832

    
2833
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2834
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2835
      raise errors.OpPrereqError("Instance's disk layout is not"
2836
                                 " network mirrored, cannot failover.")
2837

    
2838
    secondary_nodes = instance.secondary_nodes
2839
    if not secondary_nodes:
2840
      raise errors.ProgrammerError("no secondary node but using "
2841
                                   "a mirrored disk template")
2842

    
2843
    target_node = secondary_nodes[0]
2844
    # check memory requirements on the secondary node
2845
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2846
                         instance.name, bep[constants.BE_MEMORY],
2847
                         instance.hypervisor)
2848

    
2849
    # check bridge existance
2850
    brlist = [nic.bridge for nic in instance.nics]
2851
    if not self.rpc.call_bridges_exist(target_node, brlist):
2852
      raise errors.OpPrereqError("One or more target bridges %s does not"
2853
                                 " exist on destination node '%s'" %
2854
                                 (brlist, target_node))
2855

    
2856
  def Exec(self, feedback_fn):
2857
    """Failover an instance.
2858

2859
    The failover is done by shutting it down on its present node and
2860
    starting it on the secondary.
2861

2862
    """
2863
    instance = self.instance
2864

    
2865
    source_node = instance.primary_node
2866
    target_node = instance.secondary_nodes[0]
2867

    
2868
    feedback_fn("* checking disk consistency between source and target")
2869
    for dev in instance.disks:
2870
      # for drbd, these are drbd over lvm
2871
      if not _CheckDiskConsistency(self, dev, target_node, False):
2872
        if instance.status == "up" and not self.op.ignore_consistency:
2873
          raise errors.OpExecError("Disk %s is degraded on target node,"
2874
                                   " aborting failover." % dev.iv_name)
2875

    
2876
    feedback_fn("* shutting down instance on source node")
2877
    logger.Info("Shutting down instance %s on node %s" %
2878
                (instance.name, source_node))
2879

    
2880
    if not self.rpc.call_instance_shutdown(source_node, instance):
2881
      if self.op.ignore_consistency:
2882
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2883
                     " anyway. Please make sure node %s is down"  %
2884
                     (instance.name, source_node, source_node))
2885
      else:
2886
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2887
                                 (instance.name, source_node))
2888

    
2889
    feedback_fn("* deactivating the instance's disks on source node")
2890
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2891
      raise errors.OpExecError("Can't shut down the instance's disks.")
2892

    
2893
    instance.primary_node = target_node
2894
    # distribute new instance config to the other nodes
2895
    self.cfg.Update(instance)
2896

    
2897
    # Only start the instance if it's marked as up
2898
    if instance.status == "up":
2899
      feedback_fn("* activating the instance's disks on target node")
2900
      logger.Info("Starting instance %s on node %s" %
2901
                  (instance.name, target_node))
2902

    
2903
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2904
                                               ignore_secondaries=True)
2905
      if not disks_ok:
2906
        _ShutdownInstanceDisks(self, instance)
2907
        raise errors.OpExecError("Can't activate the instance's disks")
2908

    
2909
      feedback_fn("* starting the instance on the target node")
2910
      if not self.rpc.call_instance_start(target_node, instance, None):
2911
        _ShutdownInstanceDisks(self, instance)
2912
        raise errors.OpExecError("Could not start instance %s on node %s." %
2913
                                 (instance.name, target_node))
2914

    
2915

    
2916
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2917
  """Create a tree of block devices on the primary node.
2918

2919
  This always creates all devices.
2920

2921
  """
2922
  if device.children:
2923
    for child in device.children:
2924
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2925
        return False
2926

    
2927
  lu.cfg.SetDiskID(device, node)
2928
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2929
                                       instance.name, True, info)
2930
  if not new_id:
2931
    return False
2932
  if device.physical_id is None:
2933
    device.physical_id = new_id
2934
  return True
2935

    
2936

    
2937
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2938
  """Create a tree of block devices on a secondary node.
2939

2940
  If this device type has to be created on secondaries, create it and
2941
  all its children.
2942

2943
  If not, just recurse to children keeping the same 'force' value.
2944

2945
  """
2946
  if device.CreateOnSecondary():
2947
    force = True
2948
  if device.children:
2949
    for child in device.children:
2950
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2951
                                        child, force, info):
2952
        return False
2953

    
2954
  if not force:
2955
    return True
2956
  lu.cfg.SetDiskID(device, node)
2957
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2958
                                       instance.name, False, info)
2959
  if not new_id:
2960
    return False
2961
  if device.physical_id is None:
2962
    device.physical_id = new_id
2963
  return True
2964

    
2965

    
2966
def _GenerateUniqueNames(lu, exts):
2967
  """Generate a suitable LV name.
2968

2969
  This will generate a logical volume name for the given instance.
2970

2971
  """
2972
  results = []
2973
  for val in exts:
2974
    new_id = lu.cfg.GenerateUniqueID()
2975
    results.append("%s%s" % (new_id, val))
2976
  return results
2977

    
2978

    
2979
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2980
                         p_minor, s_minor):
2981
  """Generate a drbd8 device complete with its children.
2982

2983
  """
2984
  port = lu.cfg.AllocatePort()
2985
  vgname = lu.cfg.GetVGName()
2986
  shared_secret = lu.cfg.GenerateDRBDSecret()
2987
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2988
                          logical_id=(vgname, names[0]))
2989
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2990
                          logical_id=(vgname, names[1]))
2991
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2992
                          logical_id=(primary, secondary, port,
2993
                                      p_minor, s_minor,
2994
                                      shared_secret),
2995
                          children=[dev_data, dev_meta],
2996
                          iv_name=iv_name)
2997
  return drbd_dev
2998

    
2999

    
3000
def _GenerateDiskTemplate(lu, template_name,
3001
                          instance_name, primary_node,
3002
                          secondary_nodes, disk_sz, swap_sz,
3003
                          file_storage_dir, file_driver):
3004
  """Generate the entire disk layout for a given template type.
3005

3006
  """
3007
  #TODO: compute space requirements
3008

    
3009
  vgname = lu.cfg.GetVGName()
3010
  if template_name == constants.DT_DISKLESS:
3011
    disks = []
3012
  elif template_name == constants.DT_PLAIN:
3013
    if len(secondary_nodes) != 0:
3014
      raise errors.ProgrammerError("Wrong template configuration")
3015

    
3016
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3017
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3018
                           logical_id=(vgname, names[0]),
3019
                           iv_name = "sda")
3020
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3021
                           logical_id=(vgname, names[1]),
3022
                           iv_name = "sdb")
3023
    disks = [sda_dev, sdb_dev]
3024
  elif template_name == constants.DT_DRBD8:
3025
    if len(secondary_nodes) != 1:
3026
      raise errors.ProgrammerError("Wrong template configuration")
3027
    remote_node = secondary_nodes[0]
3028
    (minor_pa, minor_pb,
3029
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3030
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3031

    
3032
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3033
                                      ".sdb_data", ".sdb_meta"])
3034
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3035
                                        disk_sz, names[0:2], "sda",
3036
                                        minor_pa, minor_sa)
3037
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3038
                                        swap_sz, names[2:4], "sdb",
3039
                                        minor_pb, minor_sb)
3040
    disks = [drbd_sda_dev, drbd_sdb_dev]
3041
  elif template_name == constants.DT_FILE:
3042
    if len(secondary_nodes) != 0:
3043
      raise errors.ProgrammerError("Wrong template configuration")
3044

    
3045
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3046
                                iv_name="sda", logical_id=(file_driver,
3047
                                "%s/sda" % file_storage_dir))
3048
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3049
                                iv_name="sdb", logical_id=(file_driver,
3050
                                "%s/sdb" % file_storage_dir))
3051
    disks = [file_sda_dev, file_sdb_dev]
3052
  else:
3053
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3054
  return disks
3055

    
3056

    
3057
def _GetInstanceInfoText(instance):
3058
  """Compute that text that should be added to the disk's metadata.
3059

3060
  """
3061
  return "originstname+%s" % instance.name
3062

    
3063

    
3064
def _CreateDisks(lu, instance):
3065
  """Create all disks for an instance.
3066

3067
  This abstracts away some work from AddInstance.
3068

3069
  Args:
3070
    instance: the instance object
3071

3072
  Returns:
3073
    True or False showing the success of the creation process
3074

3075
  """
3076
  info = _GetInstanceInfoText(instance)
3077

    
3078
  if instance.disk_template == constants.DT_FILE:
3079
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3080
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3081
                                                 file_storage_dir)
3082

    
3083
    if not result:
3084
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3085
      return False
3086

    
3087
    if not result[0]:
3088
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3089
      return False
3090

    
3091
  for device in instance.disks:
3092
    logger.Info("creating volume %s for instance %s" %
3093
                (device.iv_name, instance.name))
3094
    #HARDCODE
3095
    for secondary_node in instance.secondary_nodes:
3096
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3097
                                        device, False, info):
3098
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3099
                     (device.iv_name, device, secondary_node))
3100
        return False
3101
    #HARDCODE
3102
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3103
                                    instance, device, info):
3104
      logger.Error("failed to create volume %s on primary!" %
3105
                   device.iv_name)
3106
      return False
3107

    
3108
  return True
3109

    
3110

    
3111
def _RemoveDisks(lu, instance):
3112
  """Remove all disks for an instance.
3113

3114
  This abstracts away some work from `AddInstance()` and
3115
  `RemoveInstance()`. Note that in case some of the devices couldn't
3116
  be removed, the removal will continue with the other ones (compare
3117
  with `_CreateDisks()`).
3118

3119
  Args:
3120
    instance: the instance object
3121

3122
  Returns:
3123
    True or False showing the success of the removal proces
3124

3125
  """
3126
  logger.Info("removing block devices for instance %s" % instance.name)
3127

    
3128
  result = True
3129
  for device in instance.disks:
3130
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3131
      lu.cfg.SetDiskID(disk, node)
3132
      if not lu.rpc.call_blockdev_remove(node, disk):
3133
        logger.Error("could not remove block device %s on node %s,"
3134
                     " continuing anyway" %
3135
                     (device.iv_name, node))
3136
        result = False
3137

    
3138
  if instance.disk_template == constants.DT_FILE:
3139
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3140
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3141
                                               file_storage_dir):
3142
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3143
      result = False
3144

    
3145
  return result
3146

    
3147

    
3148
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3149
  """Compute disk size requirements in the volume group
3150

3151
  This is currently hard-coded for the two-drive layout.
3152

3153
  """
3154
  # Required free disk space as a function of disk and swap space
3155
  req_size_dict = {
3156
    constants.DT_DISKLESS: None,
3157
    constants.DT_PLAIN: disk_size + swap_size,
3158
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3159
    constants.DT_DRBD8: disk_size + swap_size + 256,
3160
    constants.DT_FILE: None,
3161
  }
3162

    
3163
  if disk_template not in req_size_dict:
3164
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3165
                                 " is unknown" %  disk_template)
3166

    
3167
  return req_size_dict[disk_template]
3168

    
3169

    
3170
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3171
  """Hypervisor parameter validation.
3172

3173
  This function abstract the hypervisor parameter validation to be
3174
  used in both instance create and instance modify.
3175

3176
  @type lu: L{LogicalUnit}
3177
  @param lu: the logical unit for which we check
3178
  @type nodenames: list
3179
  @param nodenames: the list of nodes on which we should check
3180
  @type hvname: string
3181
  @param hvname: the name of the hypervisor we should use
3182
  @type hvparams: dict
3183
  @param hvparams: the parameters which we need to check
3184
  @raise errors.OpPrereqError: if the parameters are not valid
3185

3186
  """
3187
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3188
                                                  hvname,
3189
                                                  hvparams)
3190
  for node in nodenames:
3191
    info = hvinfo.get(node, None)
3192
    if not info or not isinstance(info, (tuple, list)):
3193
      raise errors.OpPrereqError("Cannot get current information"
3194
                                 " from node '%s' (%s)" % (node, info))
3195
    if not info[0]:
3196
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3197
                                 " %s" % info[1])
3198

    
3199

    
3200
class LUCreateInstance(LogicalUnit):
3201
  """Create an instance.
3202

3203
  """
3204
  HPATH = "instance-add"
3205
  HTYPE = constants.HTYPE_INSTANCE
3206
  _OP_REQP = ["instance_name", "disk_size",
3207
              "disk_template", "swap_size", "mode", "start",
3208
              "wait_for_sync", "ip_check", "mac",
3209
              "hvparams", "beparams"]
3210
  REQ_BGL = False
3211

    
3212
  def _ExpandNode(self, node):
3213
    """Expands and checks one node name.
3214

3215
    """
3216
    node_full = self.cfg.ExpandNodeName(node)
3217
    if node_full is None:
3218
      raise errors.OpPrereqError("Unknown node %s" % node)
3219
    return node_full
3220

    
3221
  def ExpandNames(self):
3222
    """ExpandNames for CreateInstance.
3223

3224
    Figure out the right locks for instance creation.
3225

3226
    """
3227
    self.needed_locks = {}
3228

    
3229
    # set optional parameters to none if they don't exist
3230
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3231
      if not hasattr(self.op, attr):
3232
        setattr(self.op, attr, None)
3233

    
3234
    # cheap checks, mostly valid constants given
3235

    
3236
    # verify creation mode
3237
    if self.op.mode not in (constants.INSTANCE_CREATE,
3238
                            constants.INSTANCE_IMPORT):
3239
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3240
                                 self.op.mode)
3241

    
3242
    # disk template and mirror node verification
3243
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3244
      raise errors.OpPrereqError("Invalid disk template name")
3245

    
3246
    if self.op.hypervisor is None:
3247
      self.op.hypervisor = self.cfg.GetHypervisorType()
3248

    
3249
    cluster = self.cfg.GetClusterInfo()
3250
    enabled_hvs = cluster.enabled_hypervisors
3251
    if self.op.hypervisor not in enabled_hvs:
3252
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3253
                                 " cluster (%s)" % (self.op.hypervisor,
3254
                                  ",".join(enabled_hvs)))
3255

    
3256
    # check hypervisor parameter syntax (locally)
3257

    
3258
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3259
                                  self.op.hvparams)
3260
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3261
    hv_type.CheckParameterSyntax(filled_hvp)
3262

    
3263
    # fill and remember the beparams dict
3264
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3265
                                    self.op.beparams)
3266

    
3267
    #### instance parameters check
3268

    
3269
    # instance name verification
3270
    hostname1 = utils.HostInfo(self.op.instance_name)
3271
    self.op.instance_name = instance_name = hostname1.name
3272

    
3273
    # this is just a preventive check, but someone might still add this
3274
    # instance in the meantime, and creation will fail at lock-add time
3275
    if instance_name in self.cfg.GetInstanceList():
3276
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3277
                                 instance_name)
3278

    
3279
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3280

    
3281
    # ip validity checks
3282
    ip = getattr(self.op, "ip", None)
3283
    if ip is None or ip.lower() == "none":
3284
      inst_ip = None
3285
    elif ip.lower() == "auto":
3286
      inst_ip = hostname1.ip
3287
    else:
3288
      if not utils.IsValidIP(ip):
3289
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3290
                                   " like a valid IP" % ip)
3291
      inst_ip = ip
3292
    self.inst_ip = self.op.ip = inst_ip
3293
    # used in CheckPrereq for ip ping check
3294
    self.check_ip = hostname1.ip
3295

    
3296
    # MAC address verification
3297
    if self.op.mac != "auto":
3298
      if not utils.IsValidMac(self.op.mac.lower()):
3299
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3300
                                   self.op.mac)
3301

    
3302
    # file storage checks
3303
    if (self.op.file_driver and
3304
        not self.op.file_driver in constants.FILE_DRIVER):
3305
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3306
                                 self.op.file_driver)
3307

    
3308
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3309
      raise errors.OpPrereqError("File storage directory path not absolute")
3310

    
3311
    ### Node/iallocator related checks
3312
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3313
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3314
                                 " node must be given")
3315

    
3316
    if self.op.iallocator:
3317
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3318
    else:
3319
      self.op.pnode = self._ExpandNode(self.op.pnode)
3320
      nodelist = [self.op.pnode]
3321
      if self.op.snode is not None:
3322
        self.op.snode = self._ExpandNode(self.op.snode)
3323
        nodelist.append(self.op.snode)
3324
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3325

    
3326
    # in case of import lock the source node too
3327
    if self.op.mode == constants.INSTANCE_IMPORT:
3328
      src_node = getattr(self.op, "src_node", None)
3329
      src_path = getattr(self.op, "src_path", None)
3330

    
3331
      if src_node is None or src_path is None:
3332
        raise errors.OpPrereqError("Importing an instance requires source"
3333
                                   " node and path options")
3334

    
3335
      if not os.path.isabs(src_path):
3336
        raise errors.OpPrereqError("The source path must be absolute")
3337

    
3338
      self.op.src_node = src_node = self._ExpandNode(src_node)
3339
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3340
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3341

    
3342
    else: # INSTANCE_CREATE
3343
      if getattr(self.op, "os_type", None) is None:
3344
        raise errors.OpPrereqError("No guest OS specified")
3345

    
3346
  def _RunAllocator(self):
3347
    """Run the allocator based on input opcode.
3348

3349
    """
3350
    disks = [{"size": self.op.disk_size, "mode": "w"},
3351
             {"size": self.op.swap_size, "mode": "w"}]
3352
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3353
             "bridge": self.op.bridge}]
3354
    ial = IAllocator(self,
3355
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3356
                     name=self.op.instance_name,
3357
                     disk_template=self.op.disk_template,
3358
                     tags=[],
3359
                     os=self.op.os_type,
3360
                     vcpus=self.be_full[constants.BE_VCPUS],
3361
                     mem_size=self.be_full[constants.BE_MEMORY],
3362
                     disks=disks,
3363
                     nics=nics,
3364
                     )
3365

    
3366
    ial.Run(self.op.iallocator)
3367

    
3368
    if not ial.success:
3369
      raise errors.OpPrereqError("Can't compute nodes using"
3370
                                 " iallocator '%s': %s" % (self.op.iallocator,
3371
                                                           ial.info))
3372
    if len(ial.nodes) != ial.required_nodes:
3373
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3374
                                 " of nodes (%s), required %s" %
3375
                                 (self.op.iallocator, len(ial.nodes),
3376
                                  ial.required_nodes))
3377
    self.op.pnode = ial.nodes[0]
3378
    logger.ToStdout("Selected nodes for the instance: %s" %
3379
                    (", ".join(ial.nodes),))
3380
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3381
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3382
    if ial.required_nodes == 2:
3383
      self.op.snode = ial.nodes[1]
3384

    
3385
  def BuildHooksEnv(self):
3386
    """Build hooks env.
3387

3388
    This runs on master, primary and secondary nodes of the instance.
3389

3390
    """
3391
    env = {
3392
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3393
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3394
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3395
      "INSTANCE_ADD_MODE": self.op.mode,
3396
      }
3397
    if self.op.mode == constants.INSTANCE_IMPORT:
3398
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3399
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3400
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3401

    
3402
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3403
      primary_node=self.op.pnode,
3404
      secondary_nodes=self.secondaries,
3405
      status=self.instance_status,
3406
      os_type=self.op.os_type,
3407
      memory=self.be_full[constants.BE_MEMORY],
3408
      vcpus=self.be_full[constants.BE_VCPUS],
3409
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3410
    ))
3411

    
3412
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3413
          self.secondaries)
3414
    return env, nl, nl
3415

    
3416

    
3417
  def CheckPrereq(self):
3418
    """Check prerequisites.
3419

3420
    """
3421
    if (not self.cfg.GetVGName() and
3422
        self.op.disk_template not in constants.DTS_NOT_LVM):
3423
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3424
                                 " instances")
3425

    
3426

    
3427
    if self.op.mode == constants.INSTANCE_IMPORT:
3428
      src_node = self.op.src_node
3429
      src_path = self.op.src_path
3430

    
3431
      export_info = self.rpc.call_export_info(src_node, src_path)
3432

    
3433
      if not export_info:
3434
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3435

    
3436
      if not export_info.has_section(constants.INISECT_EXP):
3437
        raise errors.ProgrammerError("Corrupted export config")
3438

    
3439
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3440
      if (int(ei_version) != constants.EXPORT_VERSION):
3441
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3442
                                   (ei_version, constants.EXPORT_VERSION))
3443

    
3444
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3445
        raise errors.OpPrereqError("Can't import instance with more than"
3446
                                   " one data disk")
3447

    
3448
      # FIXME: are the old os-es, disk sizes, etc. useful?
3449
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3450
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3451
                                                         'disk0_dump'))
3452
      self.src_image = diskimage
3453

    
3454
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3455

    
3456
    if self.op.start and not self.op.ip_check:
3457
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3458
                                 " adding an instance in start mode")
3459

    
3460
    if self.op.ip_check:
3461
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3462
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3463
                                   (self.check_ip, self.op.instance_name))
3464

    
3465
    # bridge verification
3466
    bridge = getattr(self.op, "bridge", None)
3467
    if bridge is None:
3468
      self.op.bridge = self.cfg.GetDefBridge()
3469
    else:
3470
      self.op.bridge = bridge
3471

    
3472
    #### allocator run
3473

    
3474
    if self.op.iallocator is not None:
3475
      self._RunAllocator()
3476

    
3477
    #### node related checks
3478

    
3479
    # check primary node
3480
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3481
    assert self.pnode is not None, \
3482
      "Cannot retrieve locked node %s" % self.op.pnode
3483
    self.secondaries = []
3484

    
3485
    # mirror node verification
3486
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3487
      if self.op.snode is None:
3488
        raise errors.OpPrereqError("The networked disk templates need"
3489
                                   " a mirror node")
3490
      if self.op.snode == pnode.name:
3491
        raise errors.OpPrereqError("The secondary node cannot be"
3492
                                   " the primary node.")
3493
      self.secondaries.append(self.op.snode)
3494

    
3495
    nodenames = [pnode.name] + self.secondaries
3496

    
3497
    req_size = _ComputeDiskSize(self.op.disk_template,
3498
                                self.op.disk_size, self.op.swap_size)
3499

    
3500
    # Check lv size requirements
3501
    if req_size is not None:
3502
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3503
                                         self.op.hypervisor)
3504
      for node in nodenames:
3505
        info = nodeinfo.get(node, None)
3506
        if not info:
3507
          raise errors.OpPrereqError("Cannot get current information"
3508
                                     " from node '%s'" % node)
3509
        vg_free = info.get('vg_free', None)
3510
        if not isinstance(vg_free, int):
3511
          raise errors.OpPrereqError("Can't compute free disk space on"
3512
                                     " node %s" % node)
3513
        if req_size > info['vg_free']:
3514
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3515
                                     " %d MB available, %d MB required" %
3516
                                     (node, info['vg_free'], req_size))
3517

    
3518
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3519

    
3520
    # os verification
3521
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3522
    if not os_obj:
3523
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3524
                                 " primary node"  % self.op.os_type)
3525

    
3526
    # bridge check on primary node
3527
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3528
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3529
                                 " destination node '%s'" %
3530
                                 (self.op.bridge, pnode.name))
3531

    
3532
    # memory check on primary node
3533
    if self.op.start:
3534
      _CheckNodeFreeMemory(self, self.pnode.name,
3535
                           "creating instance %s" % self.op.instance_name,
3536
                           self.be_full[constants.BE_MEMORY],
3537
                           self.op.hypervisor)
3538

    
3539
    if self.op.start:
3540
      self.instance_status = 'up'
3541
    else:
3542
      self.instance_status = 'down'
3543

    
3544
  def Exec(self, feedback_fn):
3545
    """Create and add the instance to the cluster.
3546

3547
    """
3548
    instance = self.op.instance_name
3549
    pnode_name = self.pnode.name
3550

    
3551
    if self.op.mac == "auto":
3552
      mac_address = self.cfg.GenerateMAC()
3553
    else:
3554
      mac_address = self.op.mac
3555

    
3556
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3557
    if self.inst_ip is not None:
3558
      nic.ip = self.inst_ip
3559

    
3560
    ht_kind = self.op.hypervisor
3561
    if ht_kind in constants.HTS_REQ_PORT:
3562
      network_port = self.cfg.AllocatePort()
3563
    else:
3564
      network_port = None
3565

    
3566
    ##if self.op.vnc_bind_address is None:
3567
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3568

    
3569
    # this is needed because os.path.join does not accept None arguments
3570
    if self.op.file_storage_dir is None:
3571
      string_file_storage_dir = ""
3572
    else:
3573
      string_file_storage_dir = self.op.file_storage_dir
3574

    
3575
    # build the full file storage dir path
3576
    file_storage_dir = os.path.normpath(os.path.join(
3577
                                        self.cfg.GetFileStorageDir(),
3578
                                        string_file_storage_dir, instance))
3579

    
3580

    
3581
    disks = _GenerateDiskTemplate(self,
3582
                                  self.op.disk_template,
3583
                                  instance, pnode_name,
3584
                                  self.secondaries, self.op.disk_size,
3585
                                  self.op.swap_size,
3586
                                  file_storage_dir,
3587
                                  self.op.file_driver)
3588

    
3589
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3590
                            primary_node=pnode_name,
3591
                            nics=[nic], disks=disks,
3592
                            disk_template=self.op.disk_template,
3593
                            status=self.instance_status,
3594
                            network_port=network_port,
3595
                            beparams=self.op.beparams,
3596
                            hvparams=self.op.hvparams,
3597
                            hypervisor=self.op.hypervisor,
3598
                            )
3599

    
3600
    feedback_fn("* creating instance disks...")
3601
    if not _CreateDisks(self, iobj):
3602
      _RemoveDisks(self, iobj)
3603
      self.cfg.ReleaseDRBDMinors(instance)
3604
      raise errors.OpExecError("Device creation failed, reverting...")
3605

    
3606
    feedback_fn("adding instance %s to cluster config" % instance)
3607

    
3608
    self.cfg.AddInstance(iobj)
3609
    # Declare that we don't want to remove the instance lock anymore, as we've
3610
    # added the instance to the config
3611
    del self.remove_locks[locking.LEVEL_INSTANCE]
3612
    # Remove the temp. assignements for the instance's drbds
3613
    self.cfg.ReleaseDRBDMinors(instance)
3614

    
3615
    if self.op.wait_for_sync:
3616
      disk_abort = not _WaitForSync(self, iobj)
3617
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3618
      # make sure the disks are not degraded (still sync-ing is ok)
3619
      time.sleep(15)
3620
      feedback_fn("* checking mirrors status")
3621
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3622
    else:
3623
      disk_abort = False
3624

    
3625
    if disk_abort:
3626
      _RemoveDisks(self, iobj)
3627
      self.cfg.RemoveInstance(iobj.name)
3628
      # Make sure the instance lock gets removed
3629
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3630
      raise errors.OpExecError("There are some degraded disks for"
3631
                               " this instance")
3632

    
3633
    feedback_fn("creating os for instance %s on node %s" %
3634
                (instance, pnode_name))
3635

    
3636
    if iobj.disk_template != constants.DT_DISKLESS:
3637
      if self.op.mode == constants.INSTANCE_CREATE:
3638
        feedback_fn("* running the instance OS create scripts...")
3639
        if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3640
          raise errors.OpExecError("could not add os for instance %s"
3641
                                   " on node %s" %
3642
                                   (instance, pnode_name))
3643

    
3644
      elif self.op.mode == constants.INSTANCE_IMPORT:
3645
        feedback_fn("* running the instance OS import scripts...")
3646
        src_node = self.op.src_node
3647
        src_image = self.src_image
3648
        cluster_name = self.cfg.GetClusterName()
3649
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3650
                                                src_node, src_image,
3651
                                                cluster_name):
3652
          raise errors.OpExecError("Could not import os for instance"
3653
                                   " %s on node %s" %
3654
                                   (instance, pnode_name))
3655
      else:
3656
        # also checked in the prereq part
3657
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3658
                                     % self.op.mode)
3659

    
3660
    if self.op.start:
3661
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3662
      feedback_fn("* starting instance...")
3663
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3664
        raise errors.OpExecError("Could not start instance")
3665

    
3666

    
3667
class LUConnectConsole(NoHooksLU):
3668
  """Connect to an instance's console.
3669

3670
  This is somewhat special in that it returns the command line that
3671
  you need to run on the master node in order to connect to the
3672
  console.
3673

3674
  """
3675
  _OP_REQP = ["instance_name"]
3676
  REQ_BGL = False
3677

    
3678
  def ExpandNames(self):
3679
    self._ExpandAndLockInstance()
3680

    
3681
  def CheckPrereq(self):
3682
    """Check prerequisites.
3683

3684
    This checks that the instance is in the cluster.
3685

3686
    """
3687
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3688
    assert self.instance is not None, \
3689
      "Cannot retrieve locked instance %s" % self.op.instance_name
3690

    
3691
  def Exec(self, feedback_fn):
3692
    """Connect to the console of an instance
3693

3694
    """
3695
    instance = self.instance
3696
    node = instance.primary_node
3697

    
3698
    node_insts = self.rpc.call_instance_list([node],
3699
                                             [instance.hypervisor])[node]
3700
    if node_insts is False:
3701
      raise errors.OpExecError("Can't connect to node %s." % node)
3702

    
3703
    if instance.name not in node_insts:
3704
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3705

    
3706
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3707

    
3708
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3709
    console_cmd = hyper.GetShellCommandForConsole(instance)
3710

    
3711
    # build ssh cmdline
3712
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3713

    
3714

    
3715
class LUReplaceDisks(LogicalUnit):
3716
  """Replace the disks of an instance.
3717

3718
  """
3719
  HPATH = "mirrors-replace"
3720
  HTYPE = constants.HTYPE_INSTANCE
3721
  _OP_REQP = ["instance_name", "mode", "disks"]
3722
  REQ_BGL = False
3723

    
3724
  def ExpandNames(self):
3725
    self._ExpandAndLockInstance()
3726

    
3727
    if not hasattr(self.op, "remote_node"):
3728
      self.op.remote_node = None
3729

    
3730
    ia_name = getattr(self.op, "iallocator", None)
3731
    if ia_name is not None:
3732
      if self.op.remote_node is not None:
3733
        raise errors.OpPrereqError("Give either the iallocator or the new"
3734
                                   " secondary, not both")
3735
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3736
    elif self.op.remote_node is not None:
3737
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3738
      if remote_node is None:
3739
        raise errors.OpPrereqError("Node '%s' not known" %
3740
                                   self.op.remote_node)
3741
      self.op.remote_node = remote_node
3742
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3743
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3744
    else:
3745
      self.needed_locks[locking.LEVEL_NODE] = []
3746
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3747

    
3748
  def DeclareLocks(self, level):
3749
    # If we're not already locking all nodes in the set we have to declare the
3750
    # instance's primary/secondary nodes.
3751
    if (level == locking.LEVEL_NODE and
3752
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3753
      self._LockInstancesNodes()
3754

    
3755
  def _RunAllocator(self):
3756
    """Compute a new secondary node using an IAllocator.
3757

3758
    """
3759
    ial = IAllocator(self,
3760
                     mode=constants.IALLOCATOR_MODE_RELOC,
3761
                     name=self.op.instance_name,
3762
                     relocate_from=[self.sec_node])
3763

    
3764
    ial.Run(self.op.iallocator)
3765

    
3766
    if not ial.success:
3767
      raise errors.OpPrereqError("Can't compute nodes using"
3768
                                 " iallocator '%s': %s" % (self.op.iallocator,
3769
                                                           ial.info))
3770
    if len(ial.nodes) != ial.required_nodes:
3771
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3772
                                 " of nodes (%s), required %s" %
3773
                                 (len(ial.nodes), ial.required_nodes))
3774
    self.op.remote_node = ial.nodes[0]
3775
    logger.ToStdout("Selected new secondary for the instance: %s" %
3776
                    self.op.remote_node)
3777

    
3778
  def BuildHooksEnv(self):
3779
    """Build hooks env.
3780

3781
    This runs on the master, the primary and all the secondaries.
3782

3783
    """
3784
    env = {
3785
      "MODE": self.op.mode,
3786
      "NEW_SECONDARY": self.op.remote_node,
3787
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3788
      }
3789
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3790
    nl = [
3791
      self.cfg.GetMasterNode(),
3792
      self.instance.primary_node,
3793
      ]
3794
    if self.op.remote_node is not None:
3795
      nl.append(self.op.remote_node)
3796
    return env, nl, nl
3797

    
3798
  def CheckPrereq(self):
3799
    """Check prerequisites.
3800

3801
    This checks that the instance is in the cluster.
3802

3803
    """
3804
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3805
    assert instance is not None, \
3806
      "Cannot retrieve locked instance %s" % self.op.instance_name
3807
    self.instance = instance
3808

    
3809
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3810
      raise errors.OpPrereqError("Instance's disk layout is not"
3811
                                 " network mirrored.")
3812

    
3813
    if len(instance.secondary_nodes) != 1:
3814
      raise errors.OpPrereqError("The instance has a strange layout,"
3815
                                 " expected one secondary but found %d" %
3816
                                 len(instance.secondary_nodes))
3817

    
3818
    self.sec_node = instance.secondary_nodes[0]
3819

    
3820
    ia_name = getattr(self.op, "iallocator", None)
3821
    if ia_name is not None:
3822
      self._RunAllocator()
3823

    
3824
    remote_node = self.op.remote_node
3825
    if remote_node is not None:
3826
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3827
      assert self.remote_node_info is not None, \
3828
        "Cannot retrieve locked node %s" % remote_node
3829
    else:
3830
      self.remote_node_info = None
3831
    if remote_node == instance.primary_node:
3832
      raise errors.OpPrereqError("The specified node is the primary node of"
3833
                                 " the instance.")
3834
    elif remote_node == self.sec_node:
3835
      if self.op.mode == constants.REPLACE_DISK_SEC:
3836
        # this is for DRBD8, where we can't execute the same mode of
3837
        # replacement as for drbd7 (no different port allocated)
3838
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3839
                                   " replacement")
3840
    if instance.disk_template == constants.DT_DRBD8:
3841
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3842
          remote_node is not None):
3843
        # switch to replace secondary mode
3844
        self.op.mode = constants.REPLACE_DISK_SEC
3845

    
3846
      if self.op.mode == constants.REPLACE_DISK_ALL:
3847
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3848
                                   " secondary disk replacement, not"
3849
                                   " both at once")
3850
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3851
        if remote_node is not None:
3852
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3853
                                     " the secondary while doing a primary"
3854
                                     " node disk replacement")
3855
        self.tgt_node = instance.primary_node
3856
        self.oth_node = instance.secondary_nodes[0]
3857
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3858
        self.new_node = remote_node # this can be None, in which case
3859
                                    # we don't change the secondary
3860
        self.tgt_node = instance.secondary_nodes[0]
3861
        self.oth_node = instance.primary_node
3862
      else:
3863
        raise errors.ProgrammerError("Unhandled disk replace mode")
3864

    
3865
    for name in self.op.disks:
3866
      if instance.FindDisk(name) is None:
3867
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3868
                                   (name, instance.name))
3869

    
3870
  def _ExecD8DiskOnly(self, feedback_fn):
3871
    """Replace a disk on the primary or secondary for dbrd8.
3872

3873
    The algorithm for replace is quite complicated:
3874
      - for each disk to be replaced:
3875
        - create new LVs on the target node with unique names
3876
        - detach old LVs from the drbd device
3877
        - rename old LVs to name_replaced.<time_t>
3878
        - rename new LVs to old LVs
3879
        - attach the new LVs (with the old names now) to the drbd device
3880
      - wait for sync across all devices
3881
      - for each modified disk:
3882
        - remove old LVs (which have the name name_replaces.<time_t>)
3883

3884
    Failures are not very well handled.
3885

3886
    """
3887
    steps_total = 6
3888
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3889
    instance = self.instance
3890
    iv_names = {}
3891
    vgname = self.cfg.GetVGName()
3892
    # start of work
3893
    cfg = self.cfg
3894
    tgt_node = self.tgt_node
3895
    oth_node = self.oth_node
3896

    
3897
    # Step: check device activation
3898
    self.proc.LogStep(1, steps_total, "check device existence")
3899
    info("checking volume groups")
3900
    my_vg = cfg.GetVGName()
3901
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3902
    if not results:
3903
      raise errors.OpExecError("Can't list volume groups on the nodes")
3904
    for node in oth_node, tgt_node:
3905
      res = results.get(node, False)
3906
      if not res or my_vg not in res:
3907
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3908
                                 (my_vg, node))
3909
    for dev in instance.disks:
3910
      if not dev.iv_name in self.op.disks:
3911
        continue
3912
      for node in tgt_node, oth_node:
3913
        info("checking %s on %s" % (dev.iv_name, node))
3914
        cfg.SetDiskID(dev, node)
3915
        if not self.rpc.call_blockdev_find(node, dev):
3916
          raise errors.OpExecError("Can't find device %s on node %s" %
3917
                                   (dev.iv_name, node))
3918

    
3919
    # Step: check other node consistency
3920
    self.proc.LogStep(2, steps_total, "check peer consistency")
3921
    for dev in instance.disks:
3922
      if not dev.iv_name in self.op.disks:
3923
        continue
3924
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3925
      if not _CheckDiskConsistency(self, dev, oth_node,
3926
                                   oth_node==instance.primary_node):
3927
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3928
                                 " to replace disks on this node (%s)" %
3929
                                 (oth_node, tgt_node))
3930

    
3931
    # Step: create new storage
3932
    self.proc.LogStep(3, steps_total, "allocate new storage")
3933
    for dev in instance.disks:
3934
      if not dev.iv_name in self.op.disks:
3935
        continue
3936
      size = dev.size
3937
      cfg.SetDiskID(dev, tgt_node)
3938
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3939
      names = _GenerateUniqueNames(self, lv_names)
3940
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3941
                             logical_id=(vgname, names[0]))
3942
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3943
                             logical_id=(vgname, names[1]))
3944
      new_lvs = [lv_data, lv_meta]
3945
      old_lvs = dev.children
3946
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3947
      info("creating new local storage on %s for %s" %
3948
           (tgt_node, dev.iv_name))
3949
      # since we *always* want to create this LV, we use the
3950
      # _Create...OnPrimary (which forces the creation), even if we
3951
      # are talking about the secondary node
3952
      for new_lv in new_lvs:
3953
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3954
                                        _GetInstanceInfoText(instance)):
3955
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3956
                                   " node '%s'" %
3957
                                   (new_lv.logical_id[1], tgt_node))
3958

    
3959
    # Step: for each lv, detach+rename*2+attach
3960
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3961
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3962
      info("detaching %s drbd from local storage" % dev.iv_name)
3963
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3964
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3965
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3966
      #dev.children = []
3967
      #cfg.Update(instance)
3968

    
3969
      # ok, we created the new LVs, so now we know we have the needed
3970
      # storage; as such, we proceed on the target node to rename
3971
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3972
      # using the assumption that logical_id == physical_id (which in
3973
      # turn is the unique_id on that node)