Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ caad16e2

History | View | Annotate | Download (191.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90

    
91
    for attr_name in self._OP_REQP:
92
      attr_val = getattr(op, attr_name, None)
93
      if attr_val is None:
94
        raise errors.OpPrereqError("Required parameter '%s' missing" %
95
                                   attr_name)
96

    
97
    if not self.cfg.IsCluster():
98
      raise errors.OpPrereqError("Cluster not initialized yet,"
99
                                 " use 'gnt-cluster init' first.")
100
    if self.REQ_MASTER:
101
      master = self.cfg.GetMasterNode()
102
      if master != utils.HostInfo().name:
103
        raise errors.OpPrereqError("Commands must be run on the master"
104
                                   " node %s" % master)
105

    
106
  def __GetSSH(self):
107
    """Returns the SshRunner object
108

109
    """
110
    if not self.__ssh:
111
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112
    return self.__ssh
113

    
114
  ssh = property(fget=__GetSSH)
115

    
116
  def ExpandNames(self):
117
    """Expand names for this LU.
118

119
    This method is called before starting to execute the opcode, and it should
120
    update all the parameters of the opcode to their canonical form (e.g. a
121
    short node name must be fully expanded after this method has successfully
122
    completed). This way locking, hooks, logging, ecc. can work correctly.
123

124
    LUs which implement this method must also populate the self.needed_locks
125
    member, as a dict with lock levels as keys, and a list of needed lock names
126
    as values. Rules:
127
      - Use an empty dict if you don't need any lock
128
      - If you don't need any lock at a particular level omit that level
129
      - Don't put anything for the BGL level
130
      - If you want all locks at a level use locking.ALL_SET as a value
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: locking.ALL_SET,
140
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self, primary_only=False):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    @type primary_only: boolean
285
    @param primary_only: only lock primary nodes of locked instances
286

287
    """
288
    assert locking.LEVEL_NODE in self.recalculate_locks, \
289
      "_LockInstancesNodes helper function called with no nodes to recalculate"
290

    
291
    # TODO: check if we're really been called with the instance locks held
292

    
293
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294
    # future we might want to have different behaviors depending on the value
295
    # of self.recalculate_locks[locking.LEVEL_NODE]
296
    wanted_nodes = []
297
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298
      instance = self.context.cfg.GetInstanceInfo(instance_name)
299
      wanted_nodes.append(instance.primary_node)
300
      if not primary_only:
301
        wanted_nodes.extend(instance.secondary_nodes)
302

    
303
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307

    
308
    del self.recalculate_locks[locking.LEVEL_NODE]
309

    
310

    
311
class NoHooksLU(LogicalUnit):
312
  """Simple LU which runs no hooks.
313

314
  This LU is intended as a parent for other LogicalUnits which will
315
  run no hooks, in order to reduce duplicate code.
316

317
  """
318
  HPATH = None
319
  HTYPE = None
320

    
321

    
322
def _GetWantedNodes(lu, nodes):
323
  """Returns list of checked and expanded node names.
324

325
  Args:
326
    nodes: List of nodes (strings) or None for all
327

328
  """
329
  if not isinstance(nodes, list):
330
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
331

    
332
  if not nodes:
333
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334
      " non-empty list of nodes whose name is to be expanded.")
335

    
336
  wanted = []
337
  for name in nodes:
338
    node = lu.cfg.ExpandNodeName(name)
339
    if node is None:
340
      raise errors.OpPrereqError("No such node name '%s'" % name)
341
    wanted.append(node)
342

    
343
  return utils.NiceSort(wanted)
344

    
345

    
346
def _GetWantedInstances(lu, instances):
347
  """Returns list of checked and expanded instance names.
348

349
  Args:
350
    instances: List of instances (strings) or None for all
351

352
  """
353
  if not isinstance(instances, list):
354
    raise errors.OpPrereqError("Invalid argument type 'instances'")
355

    
356
  if instances:
357
    wanted = []
358

    
359
    for name in instances:
360
      instance = lu.cfg.ExpandInstanceName(name)
361
      if instance is None:
362
        raise errors.OpPrereqError("No such instance name '%s'" % name)
363
      wanted.append(instance)
364

    
365
  else:
366
    wanted = lu.cfg.GetInstanceList()
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _CheckOutputFields(static, dynamic, selected):
371
  """Checks whether all selected fields are valid.
372

373
  Args:
374
    static: Static fields
375
    dynamic: Dynamic fields
376

377
  """
378
  static_fields = frozenset(static)
379
  dynamic_fields = frozenset(dynamic)
380

    
381
  all_fields = static_fields | dynamic_fields
382

    
383
  if not all_fields.issuperset(selected):
384
    raise errors.OpPrereqError("Unknown output fields selected: %s"
385
                               % ",".join(frozenset(selected).
386
                                          difference(all_fields)))
387

    
388

    
389
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390
                          memory, vcpus, nics):
391
  """Builds instance related env variables for hooks from single variables.
392

393
  Args:
394
    secondary_nodes: List of secondary nodes as strings
395
  """
396
  env = {
397
    "OP_TARGET": name,
398
    "INSTANCE_NAME": name,
399
    "INSTANCE_PRIMARY": primary_node,
400
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401
    "INSTANCE_OS_TYPE": os_type,
402
    "INSTANCE_STATUS": status,
403
    "INSTANCE_MEMORY": memory,
404
    "INSTANCE_VCPUS": vcpus,
405
  }
406

    
407
  if nics:
408
    nic_count = len(nics)
409
    for idx, (ip, bridge, mac) in enumerate(nics):
410
      if ip is None:
411
        ip = ""
412
      env["INSTANCE_NIC%d_IP" % idx] = ip
413
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415
  else:
416
    nic_count = 0
417

    
418
  env["INSTANCE_NIC_COUNT"] = nic_count
419

    
420
  return env
421

    
422

    
423
def _BuildInstanceHookEnvByObject(instance, override=None):
424
  """Builds instance related env variables for hooks from an object.
425

426
  Args:
427
    instance: objects.Instance object of instance
428
    override: dict of values to override
429
  """
430
  args = {
431
    'name': instance.name,
432
    'primary_node': instance.primary_node,
433
    'secondary_nodes': instance.secondary_nodes,
434
    'os_type': instance.os,
435
    'status': instance.os,
436
    'memory': instance.memory,
437
    'vcpus': instance.vcpus,
438
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
439
  }
440
  if override:
441
    args.update(override)
442
  return _BuildInstanceHookEnv(**args)
443

    
444

    
445
def _CheckInstanceBridgesExist(lu, instance):
446
  """Check that the brigdes needed by an instance exist.
447

448
  """
449
  # check bridges existance
450
  brlist = [nic.bridge for nic in instance.nics]
451
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
452
    raise errors.OpPrereqError("one or more target bridges %s does not"
453
                               " exist on destination node '%s'" %
454
                               (brlist, instance.primary_node))
455

    
456

    
457
class LUDestroyCluster(NoHooksLU):
458
  """Logical unit for destroying the cluster.
459

460
  """
461
  _OP_REQP = []
462

    
463
  def CheckPrereq(self):
464
    """Check prerequisites.
465

466
    This checks whether the cluster is empty.
467

468
    Any errors are signalled by raising errors.OpPrereqError.
469

470
    """
471
    master = self.cfg.GetMasterNode()
472

    
473
    nodelist = self.cfg.GetNodeList()
474
    if len(nodelist) != 1 or nodelist[0] != master:
475
      raise errors.OpPrereqError("There are still %d node(s) in"
476
                                 " this cluster." % (len(nodelist) - 1))
477
    instancelist = self.cfg.GetInstanceList()
478
    if instancelist:
479
      raise errors.OpPrereqError("There are still %d instance(s) in"
480
                                 " this cluster." % len(instancelist))
481

    
482
  def Exec(self, feedback_fn):
483
    """Destroys the cluster.
484

485
    """
486
    master = self.cfg.GetMasterNode()
487
    if not self.rpc.call_node_stop_master(master, False):
488
      raise errors.OpExecError("Could not disable the master role")
489
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490
    utils.CreateBackup(priv_key)
491
    utils.CreateBackup(pub_key)
492
    return master
493

    
494

    
495
class LUVerifyCluster(LogicalUnit):
496
  """Verifies the cluster status.
497

498
  """
499
  HPATH = "cluster-verify"
500
  HTYPE = constants.HTYPE_CLUSTER
501
  _OP_REQP = ["skip_checks"]
502
  REQ_BGL = False
503

    
504
  def ExpandNames(self):
505
    self.needed_locks = {
506
      locking.LEVEL_NODE: locking.ALL_SET,
507
      locking.LEVEL_INSTANCE: locking.ALL_SET,
508
    }
509
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
510

    
511
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512
                  remote_version, feedback_fn):
513
    """Run multiple tests against a node.
514

515
    Test list:
516
      - compares ganeti version
517
      - checks vg existance and size > 20G
518
      - checks config file checksum
519
      - checks ssh to other nodes
520

521
    Args:
522
      node: name of the node to check
523
      file_list: required list of files
524
      local_cksum: dictionary of local files and their checksums
525

526
    """
527
    # compares ganeti version
528
    local_version = constants.PROTOCOL_VERSION
529
    if not remote_version:
530
      feedback_fn("  - ERROR: connection to %s failed" % (node))
531
      return True
532

    
533
    if local_version != remote_version:
534
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
535
                      (local_version, node, remote_version))
536
      return True
537

    
538
    # checks vg existance and size > 20G
539

    
540
    bad = False
541
    if not vglist:
542
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
543
                      (node,))
544
      bad = True
545
    else:
546
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547
                                            constants.MIN_VG_SIZE)
548
      if vgstatus:
549
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
550
        bad = True
551

    
552
    if not node_result:
553
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
554
      return True
555

    
556
    # checks config file checksum
557
    # checks ssh to any
558

    
559
    if 'filelist' not in node_result:
560
      bad = True
561
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
562
    else:
563
      remote_cksum = node_result['filelist']
564
      for file_name in file_list:
565
        if file_name not in remote_cksum:
566
          bad = True
567
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
568
        elif remote_cksum[file_name] != local_cksum[file_name]:
569
          bad = True
570
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
571

    
572
    if 'nodelist' not in node_result:
573
      bad = True
574
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
575
    else:
576
      if node_result['nodelist']:
577
        bad = True
578
        for node in node_result['nodelist']:
579
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
580
                          (node, node_result['nodelist'][node]))
581
    if 'node-net-test' not in node_result:
582
      bad = True
583
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
584
    else:
585
      if node_result['node-net-test']:
586
        bad = True
587
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
588
        for node in nlist:
589
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
590
                          (node, node_result['node-net-test'][node]))
591

    
592
    hyp_result = node_result.get('hypervisor', None)
593
    if isinstance(hyp_result, dict):
594
      for hv_name, hv_result in hyp_result.iteritems():
595
        if hv_result is not None:
596
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
597
                      (hv_name, hv_result))
598
    return bad
599

    
600
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
601
                      node_instance, feedback_fn):
602
    """Verify an instance.
603

604
    This function checks to see if the required block devices are
605
    available on the instance's node.
606

607
    """
608
    bad = False
609

    
610
    node_current = instanceconfig.primary_node
611

    
612
    node_vol_should = {}
613
    instanceconfig.MapLVsByNode(node_vol_should)
614

    
615
    for node in node_vol_should:
616
      for volume in node_vol_should[node]:
617
        if node not in node_vol_is or volume not in node_vol_is[node]:
618
          feedback_fn("  - ERROR: volume %s missing on node %s" %
619
                          (volume, node))
620
          bad = True
621

    
622
    if not instanceconfig.status == 'down':
623
      if (node_current not in node_instance or
624
          not instance in node_instance[node_current]):
625
        feedback_fn("  - ERROR: instance %s not running on node %s" %
626
                        (instance, node_current))
627
        bad = True
628

    
629
    for node in node_instance:
630
      if (not node == node_current):
631
        if instance in node_instance[node]:
632
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
633
                          (instance, node))
634
          bad = True
635

    
636
    return bad
637

    
638
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
639
    """Verify if there are any unknown volumes in the cluster.
640

641
    The .os, .swap and backup volumes are ignored. All other volumes are
642
    reported as unknown.
643

644
    """
645
    bad = False
646

    
647
    for node in node_vol_is:
648
      for volume in node_vol_is[node]:
649
        if node not in node_vol_should or volume not in node_vol_should[node]:
650
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
651
                      (volume, node))
652
          bad = True
653
    return bad
654

    
655
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
656
    """Verify the list of running instances.
657

658
    This checks what instances are running but unknown to the cluster.
659

660
    """
661
    bad = False
662
    for node in node_instance:
663
      for runninginstance in node_instance[node]:
664
        if runninginstance not in instancelist:
665
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
666
                          (runninginstance, node))
667
          bad = True
668
    return bad
669

    
670
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
671
    """Verify N+1 Memory Resilience.
672

673
    Check that if one single node dies we can still start all the instances it
674
    was primary for.
675

676
    """
677
    bad = False
678

    
679
    for node, nodeinfo in node_info.iteritems():
680
      # This code checks that every node which is now listed as secondary has
681
      # enough memory to host all instances it is supposed to should a single
682
      # other node in the cluster fail.
683
      # FIXME: not ready for failover to an arbitrary node
684
      # FIXME: does not support file-backed instances
685
      # WARNING: we currently take into account down instances as well as up
686
      # ones, considering that even if they're down someone might want to start
687
      # them even in the event of a node failure.
688
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
689
        needed_mem = 0
690
        for instance in instances:
691
          needed_mem += instance_cfg[instance].memory
692
        if nodeinfo['mfree'] < needed_mem:
693
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
694
                      " failovers should node %s fail" % (node, prinode))
695
          bad = True
696
    return bad
697

    
698
  def CheckPrereq(self):
699
    """Check prerequisites.
700

701
    Transform the list of checks we're going to skip into a set and check that
702
    all its members are valid.
703

704
    """
705
    self.skip_set = frozenset(self.op.skip_checks)
706
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
707
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
708

    
709
  def BuildHooksEnv(self):
710
    """Build hooks env.
711

712
    Cluster-Verify hooks just rone in the post phase and their failure makes
713
    the output be logged in the verify output and the verification to fail.
714

715
    """
716
    all_nodes = self.cfg.GetNodeList()
717
    # TODO: populate the environment with useful information for verify hooks
718
    env = {}
719
    return env, [], all_nodes
720

    
721
  def Exec(self, feedback_fn):
722
    """Verify integrity of cluster, performing various test on nodes.
723

724
    """
725
    bad = False
726
    feedback_fn("* Verifying global settings")
727
    for msg in self.cfg.VerifyConfig():
728
      feedback_fn("  - ERROR: %s" % msg)
729

    
730
    vg_name = self.cfg.GetVGName()
731
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
732
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
733
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
734
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
735
    i_non_redundant = [] # Non redundant instances
736
    node_volume = {}
737
    node_instance = {}
738
    node_info = {}
739
    instance_cfg = {}
740

    
741
    # FIXME: verify OS list
742
    # do local checksums
743
    file_names = []
744
    file_names.append(constants.SSL_CERT_FILE)
745
    file_names.append(constants.CLUSTER_CONF_FILE)
746
    local_checksums = utils.FingerprintFiles(file_names)
747

    
748
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
749
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
750
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
751
    all_vglist = self.rpc.call_vg_list(nodelist)
752
    node_verify_param = {
753
      'filelist': file_names,
754
      'nodelist': nodelist,
755
      'hypervisor': hypervisors,
756
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
757
                        for node in nodeinfo]
758
      }
759
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
760
                                           self.cfg.GetClusterName())
761
    all_rversion = self.rpc.call_version(nodelist)
762
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
763
                                        self.cfg.GetHypervisorType())
764

    
765
    for node in nodelist:
766
      feedback_fn("* Verifying node %s" % node)
767
      result = self._VerifyNode(node, file_names, local_checksums,
768
                                all_vglist[node], all_nvinfo[node],
769
                                all_rversion[node], feedback_fn)
770
      bad = bad or result
771

    
772
      # node_volume
773
      volumeinfo = all_volumeinfo[node]
774

    
775
      if isinstance(volumeinfo, basestring):
776
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
777
                    (node, volumeinfo[-400:].encode('string_escape')))
778
        bad = True
779
        node_volume[node] = {}
780
      elif not isinstance(volumeinfo, dict):
781
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
782
        bad = True
783
        continue
784
      else:
785
        node_volume[node] = volumeinfo
786

    
787
      # node_instance
788
      nodeinstance = all_instanceinfo[node]
789
      if type(nodeinstance) != list:
790
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
791
        bad = True
792
        continue
793

    
794
      node_instance[node] = nodeinstance
795

    
796
      # node_info
797
      nodeinfo = all_ninfo[node]
798
      if not isinstance(nodeinfo, dict):
799
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
800
        bad = True
801
        continue
802

    
803
      try:
804
        node_info[node] = {
805
          "mfree": int(nodeinfo['memory_free']),
806
          "dfree": int(nodeinfo['vg_free']),
807
          "pinst": [],
808
          "sinst": [],
809
          # dictionary holding all instances this node is secondary for,
810
          # grouped by their primary node. Each key is a cluster node, and each
811
          # value is a list of instances which have the key as primary and the
812
          # current node as secondary.  this is handy to calculate N+1 memory
813
          # availability if you can only failover from a primary to its
814
          # secondary.
815
          "sinst-by-pnode": {},
816
        }
817
      except ValueError:
818
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
819
        bad = True
820
        continue
821

    
822
    node_vol_should = {}
823

    
824
    for instance in instancelist:
825
      feedback_fn("* Verifying instance %s" % instance)
826
      inst_config = self.cfg.GetInstanceInfo(instance)
827
      result =  self._VerifyInstance(instance, inst_config, node_volume,
828
                                     node_instance, feedback_fn)
829
      bad = bad or result
830

    
831
      inst_config.MapLVsByNode(node_vol_should)
832

    
833
      instance_cfg[instance] = inst_config
834

    
835
      pnode = inst_config.primary_node
836
      if pnode in node_info:
837
        node_info[pnode]['pinst'].append(instance)
838
      else:
839
        feedback_fn("  - ERROR: instance %s, connection to primary node"
840
                    " %s failed" % (instance, pnode))
841
        bad = True
842

    
843
      # If the instance is non-redundant we cannot survive losing its primary
844
      # node, so we are not N+1 compliant. On the other hand we have no disk
845
      # templates with more than one secondary so that situation is not well
846
      # supported either.
847
      # FIXME: does not support file-backed instances
848
      if len(inst_config.secondary_nodes) == 0:
849
        i_non_redundant.append(instance)
850
      elif len(inst_config.secondary_nodes) > 1:
851
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
852
                    % instance)
853

    
854
      for snode in inst_config.secondary_nodes:
855
        if snode in node_info:
856
          node_info[snode]['sinst'].append(instance)
857
          if pnode not in node_info[snode]['sinst-by-pnode']:
858
            node_info[snode]['sinst-by-pnode'][pnode] = []
859
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
860
        else:
861
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
862
                      " %s failed" % (instance, snode))
863

    
864
    feedback_fn("* Verifying orphan volumes")
865
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
866
                                       feedback_fn)
867
    bad = bad or result
868

    
869
    feedback_fn("* Verifying remaining instances")
870
    result = self._VerifyOrphanInstances(instancelist, node_instance,
871
                                         feedback_fn)
872
    bad = bad or result
873

    
874
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
875
      feedback_fn("* Verifying N+1 Memory redundancy")
876
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
877
      bad = bad or result
878

    
879
    feedback_fn("* Other Notes")
880
    if i_non_redundant:
881
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
882
                  % len(i_non_redundant))
883

    
884
    return not bad
885

    
886
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
887
    """Analize the post-hooks' result, handle it, and send some
888
    nicely-formatted feedback back to the user.
889

890
    Args:
891
      phase: the hooks phase that has just been run
892
      hooks_results: the results of the multi-node hooks rpc call
893
      feedback_fn: function to send feedback back to the caller
894
      lu_result: previous Exec result
895

896
    """
897
    # We only really run POST phase hooks, and are only interested in
898
    # their results
899
    if phase == constants.HOOKS_PHASE_POST:
900
      # Used to change hooks' output to proper indentation
901
      indent_re = re.compile('^', re.M)
902
      feedback_fn("* Hooks Results")
903
      if not hooks_results:
904
        feedback_fn("  - ERROR: general communication failure")
905
        lu_result = 1
906
      else:
907
        for node_name in hooks_results:
908
          show_node_header = True
909
          res = hooks_results[node_name]
910
          if res is False or not isinstance(res, list):
911
            feedback_fn("    Communication failure")
912
            lu_result = 1
913
            continue
914
          for script, hkr, output in res:
915
            if hkr == constants.HKR_FAIL:
916
              # The node header is only shown once, if there are
917
              # failing hooks on that node
918
              if show_node_header:
919
                feedback_fn("  Node %s:" % node_name)
920
                show_node_header = False
921
              feedback_fn("    ERROR: Script %s failed, output:" % script)
922
              output = indent_re.sub('      ', output)
923
              feedback_fn("%s" % output)
924
              lu_result = 1
925

    
926
      return lu_result
927

    
928

    
929
class LUVerifyDisks(NoHooksLU):
930
  """Verifies the cluster disks status.
931

932
  """
933
  _OP_REQP = []
934
  REQ_BGL = False
935

    
936
  def ExpandNames(self):
937
    self.needed_locks = {
938
      locking.LEVEL_NODE: locking.ALL_SET,
939
      locking.LEVEL_INSTANCE: locking.ALL_SET,
940
    }
941
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
942

    
943
  def CheckPrereq(self):
944
    """Check prerequisites.
945

946
    This has no prerequisites.
947

948
    """
949
    pass
950

    
951
  def Exec(self, feedback_fn):
952
    """Verify integrity of cluster disks.
953

954
    """
955
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
956

    
957
    vg_name = self.cfg.GetVGName()
958
    nodes = utils.NiceSort(self.cfg.GetNodeList())
959
    instances = [self.cfg.GetInstanceInfo(name)
960
                 for name in self.cfg.GetInstanceList()]
961

    
962
    nv_dict = {}
963
    for inst in instances:
964
      inst_lvs = {}
965
      if (inst.status != "up" or
966
          inst.disk_template not in constants.DTS_NET_MIRROR):
967
        continue
968
      inst.MapLVsByNode(inst_lvs)
969
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
970
      for node, vol_list in inst_lvs.iteritems():
971
        for vol in vol_list:
972
          nv_dict[(node, vol)] = inst
973

    
974
    if not nv_dict:
975
      return result
976

    
977
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
978

    
979
    to_act = set()
980
    for node in nodes:
981
      # node_volume
982
      lvs = node_lvs[node]
983

    
984
      if isinstance(lvs, basestring):
985
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
986
        res_nlvm[node] = lvs
987
      elif not isinstance(lvs, dict):
988
        logger.Info("connection to node %s failed or invalid data returned" %
989
                    (node,))
990
        res_nodes.append(node)
991
        continue
992

    
993
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
994
        inst = nv_dict.pop((node, lv_name), None)
995
        if (not lv_online and inst is not None
996
            and inst.name not in res_instances):
997
          res_instances.append(inst.name)
998

    
999
    # any leftover items in nv_dict are missing LVs, let's arrange the
1000
    # data better
1001
    for key, inst in nv_dict.iteritems():
1002
      if inst.name not in res_missing:
1003
        res_missing[inst.name] = []
1004
      res_missing[inst.name].append(key)
1005

    
1006
    return result
1007

    
1008

    
1009
class LURenameCluster(LogicalUnit):
1010
  """Rename the cluster.
1011

1012
  """
1013
  HPATH = "cluster-rename"
1014
  HTYPE = constants.HTYPE_CLUSTER
1015
  _OP_REQP = ["name"]
1016

    
1017
  def BuildHooksEnv(self):
1018
    """Build hooks env.
1019

1020
    """
1021
    env = {
1022
      "OP_TARGET": self.cfg.GetClusterName(),
1023
      "NEW_NAME": self.op.name,
1024
      }
1025
    mn = self.cfg.GetMasterNode()
1026
    return env, [mn], [mn]
1027

    
1028
  def CheckPrereq(self):
1029
    """Verify that the passed name is a valid one.
1030

1031
    """
1032
    hostname = utils.HostInfo(self.op.name)
1033

    
1034
    new_name = hostname.name
1035
    self.ip = new_ip = hostname.ip
1036
    old_name = self.cfg.GetClusterName()
1037
    old_ip = self.cfg.GetMasterIP()
1038
    if new_name == old_name and new_ip == old_ip:
1039
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1040
                                 " cluster has changed")
1041
    if new_ip != old_ip:
1042
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1043
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1044
                                   " reachable on the network. Aborting." %
1045
                                   new_ip)
1046

    
1047
    self.op.name = new_name
1048

    
1049
  def Exec(self, feedback_fn):
1050
    """Rename the cluster.
1051

1052
    """
1053
    clustername = self.op.name
1054
    ip = self.ip
1055

    
1056
    # shutdown the master IP
1057
    master = self.cfg.GetMasterNode()
1058
    if not self.rpc.call_node_stop_master(master, False):
1059
      raise errors.OpExecError("Could not disable the master role")
1060

    
1061
    try:
1062
      # modify the sstore
1063
      # TODO: sstore
1064
      ss.SetKey(ss.SS_MASTER_IP, ip)
1065
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1066

    
1067
      # Distribute updated ss config to all nodes
1068
      myself = self.cfg.GetNodeInfo(master)
1069
      dist_nodes = self.cfg.GetNodeList()
1070
      if myself.name in dist_nodes:
1071
        dist_nodes.remove(myself.name)
1072

    
1073
      logger.Debug("Copying updated ssconf data to all nodes")
1074
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1075
        fname = ss.KeyToFilename(keyname)
1076
        result = self.rpc.call_upload_file(dist_nodes, fname)
1077
        for to_node in dist_nodes:
1078
          if not result[to_node]:
1079
            logger.Error("copy of file %s to node %s failed" %
1080
                         (fname, to_node))
1081
    finally:
1082
      if not self.rpc.call_node_start_master(master, False):
1083
        logger.Error("Could not re-enable the master role on the master,"
1084
                     " please restart manually.")
1085

    
1086

    
1087
def _RecursiveCheckIfLVMBased(disk):
1088
  """Check if the given disk or its children are lvm-based.
1089

1090
  Args:
1091
    disk: ganeti.objects.Disk object
1092

1093
  Returns:
1094
    boolean indicating whether a LD_LV dev_type was found or not
1095

1096
  """
1097
  if disk.children:
1098
    for chdisk in disk.children:
1099
      if _RecursiveCheckIfLVMBased(chdisk):
1100
        return True
1101
  return disk.dev_type == constants.LD_LV
1102

    
1103

    
1104
class LUSetClusterParams(LogicalUnit):
1105
  """Change the parameters of the cluster.
1106

1107
  """
1108
  HPATH = "cluster-modify"
1109
  HTYPE = constants.HTYPE_CLUSTER
1110
  _OP_REQP = []
1111
  REQ_BGL = False
1112

    
1113
  def ExpandNames(self):
1114
    # FIXME: in the future maybe other cluster params won't require checking on
1115
    # all nodes to be modified.
1116
    self.needed_locks = {
1117
      locking.LEVEL_NODE: locking.ALL_SET,
1118
    }
1119
    self.share_locks[locking.LEVEL_NODE] = 1
1120

    
1121
  def BuildHooksEnv(self):
1122
    """Build hooks env.
1123

1124
    """
1125
    env = {
1126
      "OP_TARGET": self.cfg.GetClusterName(),
1127
      "NEW_VG_NAME": self.op.vg_name,
1128
      }
1129
    mn = self.cfg.GetMasterNode()
1130
    return env, [mn], [mn]
1131

    
1132
  def CheckPrereq(self):
1133
    """Check prerequisites.
1134

1135
    This checks whether the given params don't conflict and
1136
    if the given volume group is valid.
1137

1138
    """
1139
    # FIXME: This only works because there is only one parameter that can be
1140
    # changed or removed.
1141
    if not self.op.vg_name:
1142
      instances = self.cfg.GetAllInstancesInfo().values()
1143
      for inst in instances:
1144
        for disk in inst.disks:
1145
          if _RecursiveCheckIfLVMBased(disk):
1146
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1147
                                       " lvm-based instances exist")
1148

    
1149
    # if vg_name not None, checks given volume group on all nodes
1150
    if self.op.vg_name:
1151
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1152
      vglist = self.rpc.call_vg_list(node_list)
1153
      for node in node_list:
1154
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1155
                                              constants.MIN_VG_SIZE)
1156
        if vgstatus:
1157
          raise errors.OpPrereqError("Error on node '%s': %s" %
1158
                                     (node, vgstatus))
1159

    
1160
  def Exec(self, feedback_fn):
1161
    """Change the parameters of the cluster.
1162

1163
    """
1164
    if self.op.vg_name != self.cfg.GetVGName():
1165
      self.cfg.SetVGName(self.op.vg_name)
1166
    else:
1167
      feedback_fn("Cluster LVM configuration already in desired"
1168
                  " state, not changing")
1169

    
1170

    
1171
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1172
  """Sleep and poll for an instance's disk to sync.
1173

1174
  """
1175
  if not instance.disks:
1176
    return True
1177

    
1178
  if not oneshot:
1179
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1180

    
1181
  node = instance.primary_node
1182

    
1183
  for dev in instance.disks:
1184
    lu.cfg.SetDiskID(dev, node)
1185

    
1186
  retries = 0
1187
  while True:
1188
    max_time = 0
1189
    done = True
1190
    cumul_degraded = False
1191
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1192
    if not rstats:
1193
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1194
      retries += 1
1195
      if retries >= 10:
1196
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1197
                                 " aborting." % node)
1198
      time.sleep(6)
1199
      continue
1200
    retries = 0
1201
    for i in range(len(rstats)):
1202
      mstat = rstats[i]
1203
      if mstat is None:
1204
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1205
                           (node, instance.disks[i].iv_name))
1206
        continue
1207
      # we ignore the ldisk parameter
1208
      perc_done, est_time, is_degraded, _ = mstat
1209
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1210
      if perc_done is not None:
1211
        done = False
1212
        if est_time is not None:
1213
          rem_time = "%d estimated seconds remaining" % est_time
1214
          max_time = est_time
1215
        else:
1216
          rem_time = "no time estimate"
1217
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1218
                        (instance.disks[i].iv_name, perc_done, rem_time))
1219
    if done or oneshot:
1220
      break
1221

    
1222
    time.sleep(min(60, max_time))
1223

    
1224
  if done:
1225
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1226
  return not cumul_degraded
1227

    
1228

    
1229
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1230
  """Check that mirrors are not degraded.
1231

1232
  The ldisk parameter, if True, will change the test from the
1233
  is_degraded attribute (which represents overall non-ok status for
1234
  the device(s)) to the ldisk (representing the local storage status).
1235

1236
  """
1237
  lu.cfg.SetDiskID(dev, node)
1238
  if ldisk:
1239
    idx = 6
1240
  else:
1241
    idx = 5
1242

    
1243
  result = True
1244
  if on_primary or dev.AssembleOnSecondary():
1245
    rstats = lu.rpc.call_blockdev_find(node, dev)
1246
    if not rstats:
1247
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1248
      result = False
1249
    else:
1250
      result = result and (not rstats[idx])
1251
  if dev.children:
1252
    for child in dev.children:
1253
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1254

    
1255
  return result
1256

    
1257

    
1258
class LUDiagnoseOS(NoHooksLU):
1259
  """Logical unit for OS diagnose/query.
1260

1261
  """
1262
  _OP_REQP = ["output_fields", "names"]
1263
  REQ_BGL = False
1264

    
1265
  def ExpandNames(self):
1266
    if self.op.names:
1267
      raise errors.OpPrereqError("Selective OS query not supported")
1268

    
1269
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1270
    _CheckOutputFields(static=[],
1271
                       dynamic=self.dynamic_fields,
1272
                       selected=self.op.output_fields)
1273

    
1274
    # Lock all nodes, in shared mode
1275
    self.needed_locks = {}
1276
    self.share_locks[locking.LEVEL_NODE] = 1
1277
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1278

    
1279
  def CheckPrereq(self):
1280
    """Check prerequisites.
1281

1282
    """
1283

    
1284
  @staticmethod
1285
  def _DiagnoseByOS(node_list, rlist):
1286
    """Remaps a per-node return list into an a per-os per-node dictionary
1287

1288
      Args:
1289
        node_list: a list with the names of all nodes
1290
        rlist: a map with node names as keys and OS objects as values
1291

1292
      Returns:
1293
        map: a map with osnames as keys and as value another map, with
1294
             nodes as
1295
             keys and list of OS objects as values
1296
             e.g. {"debian-etch": {"node1": [<object>,...],
1297
                                   "node2": [<object>,]}
1298
                  }
1299

1300
    """
1301
    all_os = {}
1302
    for node_name, nr in rlist.iteritems():
1303
      if not nr:
1304
        continue
1305
      for os_obj in nr:
1306
        if os_obj.name not in all_os:
1307
          # build a list of nodes for this os containing empty lists
1308
          # for each node in node_list
1309
          all_os[os_obj.name] = {}
1310
          for nname in node_list:
1311
            all_os[os_obj.name][nname] = []
1312
        all_os[os_obj.name][node_name].append(os_obj)
1313
    return all_os
1314

    
1315
  def Exec(self, feedback_fn):
1316
    """Compute the list of OSes.
1317

1318
    """
1319
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1320
    node_data = self.rpc.call_os_diagnose(node_list)
1321
    if node_data == False:
1322
      raise errors.OpExecError("Can't gather the list of OSes")
1323
    pol = self._DiagnoseByOS(node_list, node_data)
1324
    output = []
1325
    for os_name, os_data in pol.iteritems():
1326
      row = []
1327
      for field in self.op.output_fields:
1328
        if field == "name":
1329
          val = os_name
1330
        elif field == "valid":
1331
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1332
        elif field == "node_status":
1333
          val = {}
1334
          for node_name, nos_list in os_data.iteritems():
1335
            val[node_name] = [(v.status, v.path) for v in nos_list]
1336
        else:
1337
          raise errors.ParameterError(field)
1338
        row.append(val)
1339
      output.append(row)
1340

    
1341
    return output
1342

    
1343

    
1344
class LURemoveNode(LogicalUnit):
1345
  """Logical unit for removing a node.
1346

1347
  """
1348
  HPATH = "node-remove"
1349
  HTYPE = constants.HTYPE_NODE
1350
  _OP_REQP = ["node_name"]
1351

    
1352
  def BuildHooksEnv(self):
1353
    """Build hooks env.
1354

1355
    This doesn't run on the target node in the pre phase as a failed
1356
    node would then be impossible to remove.
1357

1358
    """
1359
    env = {
1360
      "OP_TARGET": self.op.node_name,
1361
      "NODE_NAME": self.op.node_name,
1362
      }
1363
    all_nodes = self.cfg.GetNodeList()
1364
    all_nodes.remove(self.op.node_name)
1365
    return env, all_nodes, all_nodes
1366

    
1367
  def CheckPrereq(self):
1368
    """Check prerequisites.
1369

1370
    This checks:
1371
     - the node exists in the configuration
1372
     - it does not have primary or secondary instances
1373
     - it's not the master
1374

1375
    Any errors are signalled by raising errors.OpPrereqError.
1376

1377
    """
1378
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1379
    if node is None:
1380
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1381

    
1382
    instance_list = self.cfg.GetInstanceList()
1383

    
1384
    masternode = self.cfg.GetMasterNode()
1385
    if node.name == masternode:
1386
      raise errors.OpPrereqError("Node is the master node,"
1387
                                 " you need to failover first.")
1388

    
1389
    for instance_name in instance_list:
1390
      instance = self.cfg.GetInstanceInfo(instance_name)
1391
      if node.name == instance.primary_node:
1392
        raise errors.OpPrereqError("Instance %s still running on the node,"
1393
                                   " please remove first." % instance_name)
1394
      if node.name in instance.secondary_nodes:
1395
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1396
                                   " please remove first." % instance_name)
1397
    self.op.node_name = node.name
1398
    self.node = node
1399

    
1400
  def Exec(self, feedback_fn):
1401
    """Removes the node from the cluster.
1402

1403
    """
1404
    node = self.node
1405
    logger.Info("stopping the node daemon and removing configs from node %s" %
1406
                node.name)
1407

    
1408
    self.context.RemoveNode(node.name)
1409

    
1410
    self.rpc.call_node_leave_cluster(node.name)
1411

    
1412

    
1413
class LUQueryNodes(NoHooksLU):
1414
  """Logical unit for querying nodes.
1415

1416
  """
1417
  _OP_REQP = ["output_fields", "names"]
1418
  REQ_BGL = False
1419

    
1420
  def ExpandNames(self):
1421
    self.dynamic_fields = frozenset([
1422
      "dtotal", "dfree",
1423
      "mtotal", "mnode", "mfree",
1424
      "bootid",
1425
      "ctotal",
1426
      ])
1427

    
1428
    self.static_fields = frozenset([
1429
      "name", "pinst_cnt", "sinst_cnt",
1430
      "pinst_list", "sinst_list",
1431
      "pip", "sip", "tags",
1432
      "serial_no",
1433
      ])
1434

    
1435
    _CheckOutputFields(static=self.static_fields,
1436
                       dynamic=self.dynamic_fields,
1437
                       selected=self.op.output_fields)
1438

    
1439
    self.needed_locks = {}
1440
    self.share_locks[locking.LEVEL_NODE] = 1
1441

    
1442
    if self.op.names:
1443
      self.wanted = _GetWantedNodes(self, self.op.names)
1444
    else:
1445
      self.wanted = locking.ALL_SET
1446

    
1447
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1448
    if self.do_locking:
1449
      # if we don't request only static fields, we need to lock the nodes
1450
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1451

    
1452

    
1453
  def CheckPrereq(self):
1454
    """Check prerequisites.
1455

1456
    """
1457
    # The validation of the node list is done in the _GetWantedNodes,
1458
    # if non empty, and if empty, there's no validation to do
1459
    pass
1460

    
1461
  def Exec(self, feedback_fn):
1462
    """Computes the list of nodes and their attributes.
1463

1464
    """
1465
    all_info = self.cfg.GetAllNodesInfo()
1466
    if self.do_locking:
1467
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1468
    elif self.wanted != locking.ALL_SET:
1469
      nodenames = self.wanted
1470
      missing = set(nodenames).difference(all_info.keys())
1471
      if missing:
1472
        raise errors.OpExecError(
1473
          "Some nodes were removed before retrieving their data: %s" % missing)
1474
    else:
1475
      nodenames = all_info.keys()
1476
    nodelist = [all_info[name] for name in nodenames]
1477

    
1478
    # begin data gathering
1479

    
1480
    if self.dynamic_fields.intersection(self.op.output_fields):
1481
      live_data = {}
1482
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1483
                                          self.cfg.GetHypervisorType())
1484
      for name in nodenames:
1485
        nodeinfo = node_data.get(name, None)
1486
        if nodeinfo:
1487
          live_data[name] = {
1488
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1489
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1490
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1491
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1492
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1493
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1494
            "bootid": nodeinfo['bootid'],
1495
            }
1496
        else:
1497
          live_data[name] = {}
1498
    else:
1499
      live_data = dict.fromkeys(nodenames, {})
1500

    
1501
    node_to_primary = dict([(name, set()) for name in nodenames])
1502
    node_to_secondary = dict([(name, set()) for name in nodenames])
1503

    
1504
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1505
                             "sinst_cnt", "sinst_list"))
1506
    if inst_fields & frozenset(self.op.output_fields):
1507
      instancelist = self.cfg.GetInstanceList()
1508

    
1509
      for instance_name in instancelist:
1510
        inst = self.cfg.GetInstanceInfo(instance_name)
1511
        if inst.primary_node in node_to_primary:
1512
          node_to_primary[inst.primary_node].add(inst.name)
1513
        for secnode in inst.secondary_nodes:
1514
          if secnode in node_to_secondary:
1515
            node_to_secondary[secnode].add(inst.name)
1516

    
1517
    # end data gathering
1518

    
1519
    output = []
1520
    for node in nodelist:
1521
      node_output = []
1522
      for field in self.op.output_fields:
1523
        if field == "name":
1524
          val = node.name
1525
        elif field == "pinst_list":
1526
          val = list(node_to_primary[node.name])
1527
        elif field == "sinst_list":
1528
          val = list(node_to_secondary[node.name])
1529
        elif field == "pinst_cnt":
1530
          val = len(node_to_primary[node.name])
1531
        elif field == "sinst_cnt":
1532
          val = len(node_to_secondary[node.name])
1533
        elif field == "pip":
1534
          val = node.primary_ip
1535
        elif field == "sip":
1536
          val = node.secondary_ip
1537
        elif field == "tags":
1538
          val = list(node.GetTags())
1539
        elif field == "serial_no":
1540
          val = node.serial_no
1541
        elif field in self.dynamic_fields:
1542
          val = live_data[node.name].get(field, None)
1543
        else:
1544
          raise errors.ParameterError(field)
1545
        node_output.append(val)
1546
      output.append(node_output)
1547

    
1548
    return output
1549

    
1550

    
1551
class LUQueryNodeVolumes(NoHooksLU):
1552
  """Logical unit for getting volumes on node(s).
1553

1554
  """
1555
  _OP_REQP = ["nodes", "output_fields"]
1556
  REQ_BGL = False
1557

    
1558
  def ExpandNames(self):
1559
    _CheckOutputFields(static=["node"],
1560
                       dynamic=["phys", "vg", "name", "size", "instance"],
1561
                       selected=self.op.output_fields)
1562

    
1563
    self.needed_locks = {}
1564
    self.share_locks[locking.LEVEL_NODE] = 1
1565
    if not self.op.nodes:
1566
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1567
    else:
1568
      self.needed_locks[locking.LEVEL_NODE] = \
1569
        _GetWantedNodes(self, self.op.nodes)
1570

    
1571
  def CheckPrereq(self):
1572
    """Check prerequisites.
1573

1574
    This checks that the fields required are valid output fields.
1575

1576
    """
1577
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1578

    
1579
  def Exec(self, feedback_fn):
1580
    """Computes the list of nodes and their attributes.
1581

1582
    """
1583
    nodenames = self.nodes
1584
    volumes = self.rpc.call_node_volumes(nodenames)
1585

    
1586
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1587
             in self.cfg.GetInstanceList()]
1588

    
1589
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1590

    
1591
    output = []
1592
    for node in nodenames:
1593
      if node not in volumes or not volumes[node]:
1594
        continue
1595

    
1596
      node_vols = volumes[node][:]
1597
      node_vols.sort(key=lambda vol: vol['dev'])
1598

    
1599
      for vol in node_vols:
1600
        node_output = []
1601
        for field in self.op.output_fields:
1602
          if field == "node":
1603
            val = node
1604
          elif field == "phys":
1605
            val = vol['dev']
1606
          elif field == "vg":
1607
            val = vol['vg']
1608
          elif field == "name":
1609
            val = vol['name']
1610
          elif field == "size":
1611
            val = int(float(vol['size']))
1612
          elif field == "instance":
1613
            for inst in ilist:
1614
              if node not in lv_by_node[inst]:
1615
                continue
1616
              if vol['name'] in lv_by_node[inst][node]:
1617
                val = inst.name
1618
                break
1619
            else:
1620
              val = '-'
1621
          else:
1622
            raise errors.ParameterError(field)
1623
          node_output.append(str(val))
1624

    
1625
        output.append(node_output)
1626

    
1627
    return output
1628

    
1629

    
1630
class LUAddNode(LogicalUnit):
1631
  """Logical unit for adding node to the cluster.
1632

1633
  """
1634
  HPATH = "node-add"
1635
  HTYPE = constants.HTYPE_NODE
1636
  _OP_REQP = ["node_name"]
1637

    
1638
  def BuildHooksEnv(self):
1639
    """Build hooks env.
1640

1641
    This will run on all nodes before, and on all nodes + the new node after.
1642

1643
    """
1644
    env = {
1645
      "OP_TARGET": self.op.node_name,
1646
      "NODE_NAME": self.op.node_name,
1647
      "NODE_PIP": self.op.primary_ip,
1648
      "NODE_SIP": self.op.secondary_ip,
1649
      }
1650
    nodes_0 = self.cfg.GetNodeList()
1651
    nodes_1 = nodes_0 + [self.op.node_name, ]
1652
    return env, nodes_0, nodes_1
1653

    
1654
  def CheckPrereq(self):
1655
    """Check prerequisites.
1656

1657
    This checks:
1658
     - the new node is not already in the config
1659
     - it is resolvable
1660
     - its parameters (single/dual homed) matches the cluster
1661

1662
    Any errors are signalled by raising errors.OpPrereqError.
1663

1664
    """
1665
    node_name = self.op.node_name
1666
    cfg = self.cfg
1667

    
1668
    dns_data = utils.HostInfo(node_name)
1669

    
1670
    node = dns_data.name
1671
    primary_ip = self.op.primary_ip = dns_data.ip
1672
    secondary_ip = getattr(self.op, "secondary_ip", None)
1673
    if secondary_ip is None:
1674
      secondary_ip = primary_ip
1675
    if not utils.IsValidIP(secondary_ip):
1676
      raise errors.OpPrereqError("Invalid secondary IP given")
1677
    self.op.secondary_ip = secondary_ip
1678

    
1679
    node_list = cfg.GetNodeList()
1680
    if not self.op.readd and node in node_list:
1681
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1682
                                 node)
1683
    elif self.op.readd and node not in node_list:
1684
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1685

    
1686
    for existing_node_name in node_list:
1687
      existing_node = cfg.GetNodeInfo(existing_node_name)
1688

    
1689
      if self.op.readd and node == existing_node_name:
1690
        if (existing_node.primary_ip != primary_ip or
1691
            existing_node.secondary_ip != secondary_ip):
1692
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1693
                                     " address configuration as before")
1694
        continue
1695

    
1696
      if (existing_node.primary_ip == primary_ip or
1697
          existing_node.secondary_ip == primary_ip or
1698
          existing_node.primary_ip == secondary_ip or
1699
          existing_node.secondary_ip == secondary_ip):
1700
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1701
                                   " existing node %s" % existing_node.name)
1702

    
1703
    # check that the type of the node (single versus dual homed) is the
1704
    # same as for the master
1705
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1706
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1707
    newbie_singlehomed = secondary_ip == primary_ip
1708
    if master_singlehomed != newbie_singlehomed:
1709
      if master_singlehomed:
1710
        raise errors.OpPrereqError("The master has no private ip but the"
1711
                                   " new node has one")
1712
      else:
1713
        raise errors.OpPrereqError("The master has a private ip but the"
1714
                                   " new node doesn't have one")
1715

    
1716
    # checks reachablity
1717
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1718
      raise errors.OpPrereqError("Node not reachable by ping")
1719

    
1720
    if not newbie_singlehomed:
1721
      # check reachability from my secondary ip to newbie's secondary ip
1722
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1723
                           source=myself.secondary_ip):
1724
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1725
                                   " based ping to noded port")
1726

    
1727
    self.new_node = objects.Node(name=node,
1728
                                 primary_ip=primary_ip,
1729
                                 secondary_ip=secondary_ip)
1730

    
1731
  def Exec(self, feedback_fn):
1732
    """Adds the new node to the cluster.
1733

1734
    """
1735
    new_node = self.new_node
1736
    node = new_node.name
1737

    
1738
    # check connectivity
1739
    result = self.rpc.call_version([node])[node]
1740
    if result:
1741
      if constants.PROTOCOL_VERSION == result:
1742
        logger.Info("communication to node %s fine, sw version %s match" %
1743
                    (node, result))
1744
      else:
1745
        raise errors.OpExecError("Version mismatch master version %s,"
1746
                                 " node version %s" %
1747
                                 (constants.PROTOCOL_VERSION, result))
1748
    else:
1749
      raise errors.OpExecError("Cannot get version from the new node")
1750

    
1751
    # setup ssh on node
1752
    logger.Info("copy ssh key to node %s" % node)
1753
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1754
    keyarray = []
1755
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1756
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1757
                priv_key, pub_key]
1758

    
1759
    for i in keyfiles:
1760
      f = open(i, 'r')
1761
      try:
1762
        keyarray.append(f.read())
1763
      finally:
1764
        f.close()
1765

    
1766
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1767
                                    keyarray[2],
1768
                                    keyarray[3], keyarray[4], keyarray[5])
1769

    
1770
    if not result:
1771
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1772

    
1773
    # Add node to our /etc/hosts, and add key to known_hosts
1774
    utils.AddHostToEtcHosts(new_node.name)
1775

    
1776
    if new_node.secondary_ip != new_node.primary_ip:
1777
      if not self.rpc.call_node_has_ip_address(new_node.name,
1778
                                               new_node.secondary_ip):
1779
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1780
                                 " you gave (%s). Please fix and re-run this"
1781
                                 " command." % new_node.secondary_ip)
1782

    
1783
    node_verify_list = [self.cfg.GetMasterNode()]
1784
    node_verify_param = {
1785
      'nodelist': [node],
1786
      # TODO: do a node-net-test as well?
1787
    }
1788

    
1789
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1790
                                       self.cfg.GetClusterName())
1791
    for verifier in node_verify_list:
1792
      if not result[verifier]:
1793
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1794
                                 " for remote verification" % verifier)
1795
      if result[verifier]['nodelist']:
1796
        for failed in result[verifier]['nodelist']:
1797
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1798
                      (verifier, result[verifier]['nodelist'][failed]))
1799
        raise errors.OpExecError("ssh/hostname verification failed.")
1800

    
1801
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1802
    # including the node just added
1803
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1804
    dist_nodes = self.cfg.GetNodeList()
1805
    if not self.op.readd:
1806
      dist_nodes.append(node)
1807
    if myself.name in dist_nodes:
1808
      dist_nodes.remove(myself.name)
1809

    
1810
    logger.Debug("Copying hosts and known_hosts to all nodes")
1811
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1812
      result = self.rpc.call_upload_file(dist_nodes, fname)
1813
      for to_node in dist_nodes:
1814
        if not result[to_node]:
1815
          logger.Error("copy of file %s to node %s failed" %
1816
                       (fname, to_node))
1817

    
1818
    to_copy = []
1819
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1820
      to_copy.append(constants.VNC_PASSWORD_FILE)
1821
    for fname in to_copy:
1822
      result = self.rpc.call_upload_file([node], fname)
1823
      if not result[node]:
1824
        logger.Error("could not copy file %s to node %s" % (fname, node))
1825

    
1826
    if self.op.readd:
1827
      self.context.ReaddNode(new_node)
1828
    else:
1829
      self.context.AddNode(new_node)
1830

    
1831

    
1832
class LUQueryClusterInfo(NoHooksLU):
1833
  """Query cluster configuration.
1834

1835
  """
1836
  _OP_REQP = []
1837
  REQ_MASTER = False
1838
  REQ_BGL = False
1839

    
1840
  def ExpandNames(self):
1841
    self.needed_locks = {}
1842

    
1843
  def CheckPrereq(self):
1844
    """No prerequsites needed for this LU.
1845

1846
    """
1847
    pass
1848

    
1849
  def Exec(self, feedback_fn):
1850
    """Return cluster config.
1851

1852
    """
1853
    result = {
1854
      "name": self.cfg.GetClusterName(),
1855
      "software_version": constants.RELEASE_VERSION,
1856
      "protocol_version": constants.PROTOCOL_VERSION,
1857
      "config_version": constants.CONFIG_VERSION,
1858
      "os_api_version": constants.OS_API_VERSION,
1859
      "export_version": constants.EXPORT_VERSION,
1860
      "master": self.cfg.GetMasterNode(),
1861
      "architecture": (platform.architecture()[0], platform.machine()),
1862
      "hypervisor_type": self.cfg.GetHypervisorType(),
1863
      "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1864
      }
1865

    
1866
    return result
1867

    
1868

    
1869
class LUQueryConfigValues(NoHooksLU):
1870
  """Return configuration values.
1871

1872
  """
1873
  _OP_REQP = []
1874
  REQ_BGL = False
1875

    
1876
  def ExpandNames(self):
1877
    self.needed_locks = {}
1878

    
1879
    static_fields = ["cluster_name", "master_node"]
1880
    _CheckOutputFields(static=static_fields,
1881
                       dynamic=[],
1882
                       selected=self.op.output_fields)
1883

    
1884
  def CheckPrereq(self):
1885
    """No prerequisites.
1886

1887
    """
1888
    pass
1889

    
1890
  def Exec(self, feedback_fn):
1891
    """Dump a representation of the cluster config to the standard output.
1892

1893
    """
1894
    values = []
1895
    for field in self.op.output_fields:
1896
      if field == "cluster_name":
1897
        values.append(self.cfg.GetClusterName())
1898
      elif field == "master_node":
1899
        values.append(self.cfg.GetMasterNode())
1900
      else:
1901
        raise errors.ParameterError(field)
1902
    return values
1903

    
1904

    
1905
class LUActivateInstanceDisks(NoHooksLU):
1906
  """Bring up an instance's disks.
1907

1908
  """
1909
  _OP_REQP = ["instance_name"]
1910
  REQ_BGL = False
1911

    
1912
  def ExpandNames(self):
1913
    self._ExpandAndLockInstance()
1914
    self.needed_locks[locking.LEVEL_NODE] = []
1915
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1916

    
1917
  def DeclareLocks(self, level):
1918
    if level == locking.LEVEL_NODE:
1919
      self._LockInstancesNodes()
1920

    
1921
  def CheckPrereq(self):
1922
    """Check prerequisites.
1923

1924
    This checks that the instance is in the cluster.
1925

1926
    """
1927
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1928
    assert self.instance is not None, \
1929
      "Cannot retrieve locked instance %s" % self.op.instance_name
1930

    
1931
  def Exec(self, feedback_fn):
1932
    """Activate the disks.
1933

1934
    """
1935
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1936
    if not disks_ok:
1937
      raise errors.OpExecError("Cannot activate block devices")
1938

    
1939
    return disks_info
1940

    
1941

    
1942
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1943
  """Prepare the block devices for an instance.
1944

1945
  This sets up the block devices on all nodes.
1946

1947
  Args:
1948
    instance: a ganeti.objects.Instance object
1949
    ignore_secondaries: if true, errors on secondary nodes won't result
1950
                        in an error return from the function
1951

1952
  Returns:
1953
    false if the operation failed
1954
    list of (host, instance_visible_name, node_visible_name) if the operation
1955
         suceeded with the mapping from node devices to instance devices
1956
  """
1957
  device_info = []
1958
  disks_ok = True
1959
  iname = instance.name
1960
  # With the two passes mechanism we try to reduce the window of
1961
  # opportunity for the race condition of switching DRBD to primary
1962
  # before handshaking occured, but we do not eliminate it
1963

    
1964
  # The proper fix would be to wait (with some limits) until the
1965
  # connection has been made and drbd transitions from WFConnection
1966
  # into any other network-connected state (Connected, SyncTarget,
1967
  # SyncSource, etc.)
1968

    
1969
  # 1st pass, assemble on all nodes in secondary mode
1970
  for inst_disk in instance.disks:
1971
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1972
      lu.cfg.SetDiskID(node_disk, node)
1973
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1974
      if not result:
1975
        logger.Error("could not prepare block device %s on node %s"
1976
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1977
        if not ignore_secondaries:
1978
          disks_ok = False
1979

    
1980
  # FIXME: race condition on drbd migration to primary
1981

    
1982
  # 2nd pass, do only the primary node
1983
  for inst_disk in instance.disks:
1984
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1985
      if node != instance.primary_node:
1986
        continue
1987
      lu.cfg.SetDiskID(node_disk, node)
1988
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1989
      if not result:
1990
        logger.Error("could not prepare block device %s on node %s"
1991
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1992
        disks_ok = False
1993
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1994

    
1995
  # leave the disks configured for the primary node
1996
  # this is a workaround that would be fixed better by
1997
  # improving the logical/physical id handling
1998
  for disk in instance.disks:
1999
    lu.cfg.SetDiskID(disk, instance.primary_node)
2000

    
2001
  return disks_ok, device_info
2002

    
2003

    
2004
def _StartInstanceDisks(lu, instance, force):
2005
  """Start the disks of an instance.
2006

2007
  """
2008
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2009
                                           ignore_secondaries=force)
2010
  if not disks_ok:
2011
    _ShutdownInstanceDisks(lu, instance)
2012
    if force is not None and not force:
2013
      logger.Error("If the message above refers to a secondary node,"
2014
                   " you can retry the operation using '--force'.")
2015
    raise errors.OpExecError("Disk consistency error")
2016

    
2017

    
2018
class LUDeactivateInstanceDisks(NoHooksLU):
2019
  """Shutdown an instance's disks.
2020

2021
  """
2022
  _OP_REQP = ["instance_name"]
2023
  REQ_BGL = False
2024

    
2025
  def ExpandNames(self):
2026
    self._ExpandAndLockInstance()
2027
    self.needed_locks[locking.LEVEL_NODE] = []
2028
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2029

    
2030
  def DeclareLocks(self, level):
2031
    if level == locking.LEVEL_NODE:
2032
      self._LockInstancesNodes()
2033

    
2034
  def CheckPrereq(self):
2035
    """Check prerequisites.
2036

2037
    This checks that the instance is in the cluster.
2038

2039
    """
2040
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2041
    assert self.instance is not None, \
2042
      "Cannot retrieve locked instance %s" % self.op.instance_name
2043

    
2044
  def Exec(self, feedback_fn):
2045
    """Deactivate the disks
2046

2047
    """
2048
    instance = self.instance
2049
    _SafeShutdownInstanceDisks(self, instance)
2050

    
2051

    
2052
def _SafeShutdownInstanceDisks(lu, instance):
2053
  """Shutdown block devices of an instance.
2054

2055
  This function checks if an instance is running, before calling
2056
  _ShutdownInstanceDisks.
2057

2058
  """
2059
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2060
                                      [instance.hypervisor])
2061
  ins_l = ins_l[instance.primary_node]
2062
  if not type(ins_l) is list:
2063
    raise errors.OpExecError("Can't contact node '%s'" %
2064
                             instance.primary_node)
2065

    
2066
  if instance.name in ins_l:
2067
    raise errors.OpExecError("Instance is running, can't shutdown"
2068
                             " block devices.")
2069

    
2070
  _ShutdownInstanceDisks(lu, instance)
2071

    
2072

    
2073
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2074
  """Shutdown block devices of an instance.
2075

2076
  This does the shutdown on all nodes of the instance.
2077

2078
  If the ignore_primary is false, errors on the primary node are
2079
  ignored.
2080

2081
  """
2082
  result = True
2083
  for disk in instance.disks:
2084
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2085
      lu.cfg.SetDiskID(top_disk, node)
2086
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2087
        logger.Error("could not shutdown block device %s on node %s" %
2088
                     (disk.iv_name, node))
2089
        if not ignore_primary or node != instance.primary_node:
2090
          result = False
2091
  return result
2092

    
2093

    
2094
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2095
  """Checks if a node has enough free memory.
2096

2097
  This function check if a given node has the needed amount of free
2098
  memory. In case the node has less memory or we cannot get the
2099
  information from the node, this function raise an OpPrereqError
2100
  exception.
2101

2102
  @type lu: C{LogicalUnit}
2103
  @param lu: a logical unit from which we get configuration data
2104
  @type node: C{str}
2105
  @param node: the node to check
2106
  @type reason: C{str}
2107
  @param reason: string to use in the error message
2108
  @type requested: C{int}
2109
  @param requested: the amount of memory in MiB to check for
2110
  @type hypervisor: C{str}
2111
  @param hypervisor: the hypervisor to ask for memory stats
2112
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2113
      we cannot check the node
2114

2115
  """
2116
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2117
  if not nodeinfo or not isinstance(nodeinfo, dict):
2118
    raise errors.OpPrereqError("Could not contact node %s for resource"
2119
                             " information" % (node,))
2120

    
2121
  free_mem = nodeinfo[node].get('memory_free')
2122
  if not isinstance(free_mem, int):
2123
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2124
                             " was '%s'" % (node, free_mem))
2125
  if requested > free_mem:
2126
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2127
                             " needed %s MiB, available %s MiB" %
2128
                             (node, reason, requested, free_mem))
2129

    
2130

    
2131
class LUStartupInstance(LogicalUnit):
2132
  """Starts an instance.
2133

2134
  """
2135
  HPATH = "instance-start"
2136
  HTYPE = constants.HTYPE_INSTANCE
2137
  _OP_REQP = ["instance_name", "force"]
2138
  REQ_BGL = False
2139

    
2140
  def ExpandNames(self):
2141
    self._ExpandAndLockInstance()
2142
    self.needed_locks[locking.LEVEL_NODE] = []
2143
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2144

    
2145
  def DeclareLocks(self, level):
2146
    if level == locking.LEVEL_NODE:
2147
      self._LockInstancesNodes()
2148

    
2149
  def BuildHooksEnv(self):
2150
    """Build hooks env.
2151

2152
    This runs on master, primary and secondary nodes of the instance.
2153

2154
    """
2155
    env = {
2156
      "FORCE": self.op.force,
2157
      }
2158
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2159
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2160
          list(self.instance.secondary_nodes))
2161
    return env, nl, nl
2162

    
2163
  def CheckPrereq(self):
2164
    """Check prerequisites.
2165

2166
    This checks that the instance is in the cluster.
2167

2168
    """
2169
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2170
    assert self.instance is not None, \
2171
      "Cannot retrieve locked instance %s" % self.op.instance_name
2172

    
2173
    # check bridges existance
2174
    _CheckInstanceBridgesExist(self, instance)
2175

    
2176
    _CheckNodeFreeMemory(self, instance.primary_node,
2177
                         "starting instance %s" % instance.name,
2178
                         instance.memory, instance.hypervisor)
2179

    
2180
  def Exec(self, feedback_fn):
2181
    """Start the instance.
2182

2183
    """
2184
    instance = self.instance
2185
    force = self.op.force
2186
    extra_args = getattr(self.op, "extra_args", "")
2187

    
2188
    self.cfg.MarkInstanceUp(instance.name)
2189

    
2190
    node_current = instance.primary_node
2191

    
2192
    _StartInstanceDisks(self, instance, force)
2193

    
2194
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2195
      _ShutdownInstanceDisks(self, instance)
2196
      raise errors.OpExecError("Could not start instance")
2197

    
2198

    
2199
class LURebootInstance(LogicalUnit):
2200
  """Reboot an instance.
2201

2202
  """
2203
  HPATH = "instance-reboot"
2204
  HTYPE = constants.HTYPE_INSTANCE
2205
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2206
  REQ_BGL = False
2207

    
2208
  def ExpandNames(self):
2209
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2210
                                   constants.INSTANCE_REBOOT_HARD,
2211
                                   constants.INSTANCE_REBOOT_FULL]:
2212
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2213
                                  (constants.INSTANCE_REBOOT_SOFT,
2214
                                   constants.INSTANCE_REBOOT_HARD,
2215
                                   constants.INSTANCE_REBOOT_FULL))
2216
    self._ExpandAndLockInstance()
2217
    self.needed_locks[locking.LEVEL_NODE] = []
2218
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2219

    
2220
  def DeclareLocks(self, level):
2221
    if level == locking.LEVEL_NODE:
2222
      primary_only = not constants.INSTANCE_REBOOT_FULL
2223
      self._LockInstancesNodes(primary_only=primary_only)
2224

    
2225
  def BuildHooksEnv(self):
2226
    """Build hooks env.
2227

2228
    This runs on master, primary and secondary nodes of the instance.
2229

2230
    """
2231
    env = {
2232
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2233
      }
2234
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2235
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2236
          list(self.instance.secondary_nodes))
2237
    return env, nl, nl
2238

    
2239
  def CheckPrereq(self):
2240
    """Check prerequisites.
2241

2242
    This checks that the instance is in the cluster.
2243

2244
    """
2245
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2246
    assert self.instance is not None, \
2247
      "Cannot retrieve locked instance %s" % self.op.instance_name
2248

    
2249
    # check bridges existance
2250
    _CheckInstanceBridgesExist(self, instance)
2251

    
2252
  def Exec(self, feedback_fn):
2253
    """Reboot the instance.
2254

2255
    """
2256
    instance = self.instance
2257
    ignore_secondaries = self.op.ignore_secondaries
2258
    reboot_type = self.op.reboot_type
2259
    extra_args = getattr(self.op, "extra_args", "")
2260

    
2261
    node_current = instance.primary_node
2262

    
2263
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2264
                       constants.INSTANCE_REBOOT_HARD]:
2265
      if not self.rpc.call_instance_reboot(node_current, instance,
2266
                                           reboot_type, extra_args):
2267
        raise errors.OpExecError("Could not reboot instance")
2268
    else:
2269
      if not self.rpc.call_instance_shutdown(node_current, instance):
2270
        raise errors.OpExecError("could not shutdown instance for full reboot")
2271
      _ShutdownInstanceDisks(self, instance)
2272
      _StartInstanceDisks(self, instance, ignore_secondaries)
2273
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2274
        _ShutdownInstanceDisks(self, instance)
2275
        raise errors.OpExecError("Could not start instance for full reboot")
2276

    
2277
    self.cfg.MarkInstanceUp(instance.name)
2278

    
2279

    
2280
class LUShutdownInstance(LogicalUnit):
2281
  """Shutdown an instance.
2282

2283
  """
2284
  HPATH = "instance-stop"
2285
  HTYPE = constants.HTYPE_INSTANCE
2286
  _OP_REQP = ["instance_name"]
2287
  REQ_BGL = False
2288

    
2289
  def ExpandNames(self):
2290
    self._ExpandAndLockInstance()
2291
    self.needed_locks[locking.LEVEL_NODE] = []
2292
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2293

    
2294
  def DeclareLocks(self, level):
2295
    if level == locking.LEVEL_NODE:
2296
      self._LockInstancesNodes()
2297

    
2298
  def BuildHooksEnv(self):
2299
    """Build hooks env.
2300

2301
    This runs on master, primary and secondary nodes of the instance.
2302

2303
    """
2304
    env = _BuildInstanceHookEnvByObject(self.instance)
2305
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2306
          list(self.instance.secondary_nodes))
2307
    return env, nl, nl
2308

    
2309
  def CheckPrereq(self):
2310
    """Check prerequisites.
2311

2312
    This checks that the instance is in the cluster.
2313

2314
    """
2315
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2316
    assert self.instance is not None, \
2317
      "Cannot retrieve locked instance %s" % self.op.instance_name
2318

    
2319
  def Exec(self, feedback_fn):
2320
    """Shutdown the instance.
2321

2322
    """
2323
    instance = self.instance
2324
    node_current = instance.primary_node
2325
    self.cfg.MarkInstanceDown(instance.name)
2326
    if not self.rpc.call_instance_shutdown(node_current, instance):
2327
      logger.Error("could not shutdown instance")
2328

    
2329
    _ShutdownInstanceDisks(self, instance)
2330

    
2331

    
2332
class LUReinstallInstance(LogicalUnit):
2333
  """Reinstall an instance.
2334

2335
  """
2336
  HPATH = "instance-reinstall"
2337
  HTYPE = constants.HTYPE_INSTANCE
2338
  _OP_REQP = ["instance_name"]
2339
  REQ_BGL = False
2340

    
2341
  def ExpandNames(self):
2342
    self._ExpandAndLockInstance()
2343
    self.needed_locks[locking.LEVEL_NODE] = []
2344
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2345

    
2346
  def DeclareLocks(self, level):
2347
    if level == locking.LEVEL_NODE:
2348
      self._LockInstancesNodes()
2349

    
2350
  def BuildHooksEnv(self):
2351
    """Build hooks env.
2352

2353
    This runs on master, primary and secondary nodes of the instance.
2354

2355
    """
2356
    env = _BuildInstanceHookEnvByObject(self.instance)
2357
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2358
          list(self.instance.secondary_nodes))
2359
    return env, nl, nl
2360

    
2361
  def CheckPrereq(self):
2362
    """Check prerequisites.
2363

2364
    This checks that the instance is in the cluster and is not running.
2365

2366
    """
2367
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2368
    assert instance is not None, \
2369
      "Cannot retrieve locked instance %s" % self.op.instance_name
2370

    
2371
    if instance.disk_template == constants.DT_DISKLESS:
2372
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2373
                                 self.op.instance_name)
2374
    if instance.status != "down":
2375
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2376
                                 self.op.instance_name)
2377
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2378
                                              instance.name,
2379
                                              instance.hypervisor)
2380
    if remote_info:
2381
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2382
                                 (self.op.instance_name,
2383
                                  instance.primary_node))
2384

    
2385
    self.op.os_type = getattr(self.op, "os_type", None)
2386
    if self.op.os_type is not None:
2387
      # OS verification
2388
      pnode = self.cfg.GetNodeInfo(
2389
        self.cfg.ExpandNodeName(instance.primary_node))
2390
      if pnode is None:
2391
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2392
                                   self.op.pnode)
2393
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2394
      if not os_obj:
2395
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2396
                                   " primary node"  % self.op.os_type)
2397

    
2398
    self.instance = instance
2399

    
2400
  def Exec(self, feedback_fn):
2401
    """Reinstall the instance.
2402

2403
    """
2404
    inst = self.instance
2405

    
2406
    if self.op.os_type is not None:
2407
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2408
      inst.os = self.op.os_type
2409
      self.cfg.Update(inst)
2410

    
2411
    _StartInstanceDisks(self, inst, None)
2412
    try:
2413
      feedback_fn("Running the instance OS create scripts...")
2414
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2415
                                           "sda", "sdb"):
2416
        raise errors.OpExecError("Could not install OS for instance %s"
2417
                                 " on node %s" %
2418
                                 (inst.name, inst.primary_node))
2419
    finally:
2420
      _ShutdownInstanceDisks(self, inst)
2421

    
2422

    
2423
class LURenameInstance(LogicalUnit):
2424
  """Rename an instance.
2425

2426
  """
2427
  HPATH = "instance-rename"
2428
  HTYPE = constants.HTYPE_INSTANCE
2429
  _OP_REQP = ["instance_name", "new_name"]
2430

    
2431
  def BuildHooksEnv(self):
2432
    """Build hooks env.
2433

2434
    This runs on master, primary and secondary nodes of the instance.
2435

2436
    """
2437
    env = _BuildInstanceHookEnvByObject(self.instance)
2438
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2439
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2440
          list(self.instance.secondary_nodes))
2441
    return env, nl, nl
2442

    
2443
  def CheckPrereq(self):
2444
    """Check prerequisites.
2445

2446
    This checks that the instance is in the cluster and is not running.
2447

2448
    """
2449
    instance = self.cfg.GetInstanceInfo(
2450
      self.cfg.ExpandInstanceName(self.op.instance_name))
2451
    if instance is None:
2452
      raise errors.OpPrereqError("Instance '%s' not known" %
2453
                                 self.op.instance_name)
2454
    if instance.status != "down":
2455
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2456
                                 self.op.instance_name)
2457
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2458
                                              instance.name,
2459
                                              instance.hypervisor)
2460
    if remote_info:
2461
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2462
                                 (self.op.instance_name,
2463
                                  instance.primary_node))
2464
    self.instance = instance
2465

    
2466
    # new name verification
2467
    name_info = utils.HostInfo(self.op.new_name)
2468

    
2469
    self.op.new_name = new_name = name_info.name
2470
    instance_list = self.cfg.GetInstanceList()
2471
    if new_name in instance_list:
2472
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2473
                                 new_name)
2474

    
2475
    if not getattr(self.op, "ignore_ip", False):
2476
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2477
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2478
                                   (name_info.ip, new_name))
2479

    
2480

    
2481
  def Exec(self, feedback_fn):
2482
    """Reinstall the instance.
2483

2484
    """
2485
    inst = self.instance
2486
    old_name = inst.name
2487

    
2488
    if inst.disk_template == constants.DT_FILE:
2489
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2490

    
2491
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2492
    # Change the instance lock. This is definitely safe while we hold the BGL
2493
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2494
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2495

    
2496
    # re-read the instance from the configuration after rename
2497
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2498

    
2499
    if inst.disk_template == constants.DT_FILE:
2500
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2501
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2502
                                                     old_file_storage_dir,
2503
                                                     new_file_storage_dir)
2504

    
2505
      if not result:
2506
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2507
                                 " directory '%s' to '%s' (but the instance"
2508
                                 " has been renamed in Ganeti)" % (
2509
                                 inst.primary_node, old_file_storage_dir,
2510
                                 new_file_storage_dir))
2511

    
2512
      if not result[0]:
2513
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2514
                                 " (but the instance has been renamed in"
2515
                                 " Ganeti)" % (old_file_storage_dir,
2516
                                               new_file_storage_dir))
2517

    
2518
    _StartInstanceDisks(self, inst, None)
2519
    try:
2520
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2521
                                               old_name,
2522
                                               "sda", "sdb"):
2523
        msg = ("Could not run OS rename script for instance %s on node %s"
2524
               " (but the instance has been renamed in Ganeti)" %
2525
               (inst.name, inst.primary_node))
2526
        logger.Error(msg)
2527
    finally:
2528
      _ShutdownInstanceDisks(self, inst)
2529

    
2530

    
2531
class LURemoveInstance(LogicalUnit):
2532
  """Remove an instance.
2533

2534
  """
2535
  HPATH = "instance-remove"
2536
  HTYPE = constants.HTYPE_INSTANCE
2537
  _OP_REQP = ["instance_name", "ignore_failures"]
2538
  REQ_BGL = False
2539

    
2540
  def ExpandNames(self):
2541
    self._ExpandAndLockInstance()
2542
    self.needed_locks[locking.LEVEL_NODE] = []
2543
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2544

    
2545
  def DeclareLocks(self, level):
2546
    if level == locking.LEVEL_NODE:
2547
      self._LockInstancesNodes()
2548

    
2549
  def BuildHooksEnv(self):
2550
    """Build hooks env.
2551

2552
    This runs on master, primary and secondary nodes of the instance.
2553

2554
    """
2555
    env = _BuildInstanceHookEnvByObject(self.instance)
2556
    nl = [self.cfg.GetMasterNode()]
2557
    return env, nl, nl
2558

    
2559
  def CheckPrereq(self):
2560
    """Check prerequisites.
2561

2562
    This checks that the instance is in the cluster.
2563

2564
    """
2565
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2566
    assert self.instance is not None, \
2567
      "Cannot retrieve locked instance %s" % self.op.instance_name
2568

    
2569
  def Exec(self, feedback_fn):
2570
    """Remove the instance.
2571

2572
    """
2573
    instance = self.instance
2574
    logger.Info("shutting down instance %s on node %s" %
2575
                (instance.name, instance.primary_node))
2576

    
2577
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2578
      if self.op.ignore_failures:
2579
        feedback_fn("Warning: can't shutdown instance")
2580
      else:
2581
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2582
                                 (instance.name, instance.primary_node))
2583

    
2584
    logger.Info("removing block devices for instance %s" % instance.name)
2585

    
2586
    if not _RemoveDisks(self, instance):
2587
      if self.op.ignore_failures:
2588
        feedback_fn("Warning: can't remove instance's disks")
2589
      else:
2590
        raise errors.OpExecError("Can't remove instance's disks")
2591

    
2592
    logger.Info("removing instance %s out of cluster config" % instance.name)
2593

    
2594
    self.cfg.RemoveInstance(instance.name)
2595
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2596

    
2597

    
2598
class LUQueryInstances(NoHooksLU):
2599
  """Logical unit for querying instances.
2600

2601
  """
2602
  _OP_REQP = ["output_fields", "names"]
2603
  REQ_BGL = False
2604

    
2605
  def ExpandNames(self):
2606
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2607
    self.static_fields = frozenset([
2608
      "name", "os", "pnode", "snodes",
2609
      "admin_state", "admin_ram",
2610
      "disk_template", "ip", "mac", "bridge",
2611
      "sda_size", "sdb_size", "vcpus", "tags",
2612
      "network_port", "kernel_path", "initrd_path",
2613
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2614
      "hvm_cdrom_image_path", "hvm_nic_type",
2615
      "hvm_disk_type", "vnc_bind_address",
2616
      "serial_no", "hypervisor",
2617
      ])
2618
    _CheckOutputFields(static=self.static_fields,
2619
                       dynamic=self.dynamic_fields,
2620
                       selected=self.op.output_fields)
2621

    
2622
    self.needed_locks = {}
2623
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2624
    self.share_locks[locking.LEVEL_NODE] = 1
2625

    
2626
    if self.op.names:
2627
      self.wanted = _GetWantedInstances(self, self.op.names)
2628
    else:
2629
      self.wanted = locking.ALL_SET
2630

    
2631
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2632
    if self.do_locking:
2633
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2634
      self.needed_locks[locking.LEVEL_NODE] = []
2635
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2636

    
2637
  def DeclareLocks(self, level):
2638
    if level == locking.LEVEL_NODE and self.do_locking:
2639
      self._LockInstancesNodes()
2640

    
2641
  def CheckPrereq(self):
2642
    """Check prerequisites.
2643

2644
    """
2645
    pass
2646

    
2647
  def Exec(self, feedback_fn):
2648
    """Computes the list of nodes and their attributes.
2649

2650
    """
2651
    all_info = self.cfg.GetAllInstancesInfo()
2652
    if self.do_locking:
2653
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2654
    elif self.wanted != locking.ALL_SET:
2655
      instance_names = self.wanted
2656
      missing = set(instance_names).difference(all_info.keys())
2657
      if missing:
2658
        raise errors.OpExecError(
2659
          "Some instances were removed before retrieving their data: %s"
2660
          % missing)
2661
    else:
2662
      instance_names = all_info.keys()
2663
    instance_list = [all_info[iname] for iname in instance_names]
2664

    
2665
    # begin data gathering
2666

    
2667
    nodes = frozenset([inst.primary_node for inst in instance_list])
2668
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2669

    
2670
    bad_nodes = []
2671
    if self.dynamic_fields.intersection(self.op.output_fields):
2672
      live_data = {}
2673
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2674
      for name in nodes:
2675
        result = node_data[name]
2676
        if result:
2677
          live_data.update(result)
2678
        elif result == False:
2679
          bad_nodes.append(name)
2680
        # else no instance is alive
2681
    else:
2682
      live_data = dict([(name, {}) for name in instance_names])
2683

    
2684
    # end data gathering
2685

    
2686
    output = []
2687
    for instance in instance_list:
2688
      iout = []
2689
      for field in self.op.output_fields:
2690
        if field == "name":
2691
          val = instance.name
2692
        elif field == "os":
2693
          val = instance.os
2694
        elif field == "pnode":
2695
          val = instance.primary_node
2696
        elif field == "snodes":
2697
          val = list(instance.secondary_nodes)
2698
        elif field == "admin_state":
2699
          val = (instance.status != "down")
2700
        elif field == "oper_state":
2701
          if instance.primary_node in bad_nodes:
2702
            val = None
2703
          else:
2704
            val = bool(live_data.get(instance.name))
2705
        elif field == "status":
2706
          if instance.primary_node in bad_nodes:
2707
            val = "ERROR_nodedown"
2708
          else:
2709
            running = bool(live_data.get(instance.name))
2710
            if running:
2711
              if instance.status != "down":
2712
                val = "running"
2713
              else:
2714
                val = "ERROR_up"
2715
            else:
2716
              if instance.status != "down":
2717
                val = "ERROR_down"
2718
              else:
2719
                val = "ADMIN_down"
2720
        elif field == "admin_ram":
2721
          val = instance.memory
2722
        elif field == "oper_ram":
2723
          if instance.primary_node in bad_nodes:
2724
            val = None
2725
          elif instance.name in live_data:
2726
            val = live_data[instance.name].get("memory", "?")
2727
          else:
2728
            val = "-"
2729
        elif field == "disk_template":
2730
          val = instance.disk_template
2731
        elif field == "ip":
2732
          val = instance.nics[0].ip
2733
        elif field == "bridge":
2734
          val = instance.nics[0].bridge
2735
        elif field == "mac":
2736
          val = instance.nics[0].mac
2737
        elif field == "sda_size" or field == "sdb_size":
2738
          disk = instance.FindDisk(field[:3])
2739
          if disk is None:
2740
            val = None
2741
          else:
2742
            val = disk.size
2743
        elif field == "vcpus":
2744
          val = instance.vcpus
2745
        elif field == "tags":
2746
          val = list(instance.GetTags())
2747
        elif field == "serial_no":
2748
          val = instance.serial_no
2749
        elif field in ("network_port", "kernel_path", "initrd_path",
2750
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2751
                       "hvm_cdrom_image_path", "hvm_nic_type",
2752
                       "hvm_disk_type", "vnc_bind_address"):
2753
          val = getattr(instance, field, None)
2754
          if val is not None:
2755
            pass
2756
          elif field in ("hvm_nic_type", "hvm_disk_type",
2757
                         "kernel_path", "initrd_path"):
2758
            val = "default"
2759
          else:
2760
            val = "-"
2761
        elif field == "hypervisor":
2762
          val = instance.hypervisor
2763
        else:
2764
          raise errors.ParameterError(field)
2765
        iout.append(val)
2766
      output.append(iout)
2767

    
2768
    return output
2769

    
2770

    
2771
class LUFailoverInstance(LogicalUnit):
2772
  """Failover an instance.
2773

2774
  """
2775
  HPATH = "instance-failover"
2776
  HTYPE = constants.HTYPE_INSTANCE
2777
  _OP_REQP = ["instance_name", "ignore_consistency"]
2778
  REQ_BGL = False
2779

    
2780
  def ExpandNames(self):
2781
    self._ExpandAndLockInstance()
2782
    self.needed_locks[locking.LEVEL_NODE] = []
2783
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2784

    
2785
  def DeclareLocks(self, level):
2786
    if level == locking.LEVEL_NODE:
2787
      self._LockInstancesNodes()
2788

    
2789
  def BuildHooksEnv(self):
2790
    """Build hooks env.
2791

2792
    This runs on master, primary and secondary nodes of the instance.
2793

2794
    """
2795
    env = {
2796
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2797
      }
2798
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2799
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2800
    return env, nl, nl
2801

    
2802
  def CheckPrereq(self):
2803
    """Check prerequisites.
2804

2805
    This checks that the instance is in the cluster.
2806

2807
    """
2808
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2809
    assert self.instance is not None, \
2810
      "Cannot retrieve locked instance %s" % self.op.instance_name
2811

    
2812
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2813
      raise errors.OpPrereqError("Instance's disk layout is not"
2814
                                 " network mirrored, cannot failover.")
2815

    
2816
    secondary_nodes = instance.secondary_nodes
2817
    if not secondary_nodes:
2818
      raise errors.ProgrammerError("no secondary node but using "
2819
                                   "a mirrored disk template")
2820

    
2821
    target_node = secondary_nodes[0]
2822
    # check memory requirements on the secondary node
2823
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2824
                         instance.name, instance.memory,
2825
                         instance.hypervisor)
2826

    
2827
    # check bridge existance
2828
    brlist = [nic.bridge for nic in instance.nics]
2829
    if not self.rpc.call_bridges_exist(target_node, brlist):
2830
      raise errors.OpPrereqError("One or more target bridges %s does not"
2831
                                 " exist on destination node '%s'" %
2832
                                 (brlist, target_node))
2833

    
2834
  def Exec(self, feedback_fn):
2835
    """Failover an instance.
2836

2837
    The failover is done by shutting it down on its present node and
2838
    starting it on the secondary.
2839

2840
    """
2841
    instance = self.instance
2842

    
2843
    source_node = instance.primary_node
2844
    target_node = instance.secondary_nodes[0]
2845

    
2846
    feedback_fn("* checking disk consistency between source and target")
2847
    for dev in instance.disks:
2848
      # for drbd, these are drbd over lvm
2849
      if not _CheckDiskConsistency(self, dev, target_node, False):
2850
        if instance.status == "up" and not self.op.ignore_consistency:
2851
          raise errors.OpExecError("Disk %s is degraded on target node,"
2852
                                   " aborting failover." % dev.iv_name)
2853

    
2854
    feedback_fn("* shutting down instance on source node")
2855
    logger.Info("Shutting down instance %s on node %s" %
2856
                (instance.name, source_node))
2857

    
2858
    if not self.rpc.call_instance_shutdown(source_node, instance):
2859
      if self.op.ignore_consistency:
2860
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2861
                     " anyway. Please make sure node %s is down"  %
2862
                     (instance.name, source_node, source_node))
2863
      else:
2864
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2865
                                 (instance.name, source_node))
2866

    
2867
    feedback_fn("* deactivating the instance's disks on source node")
2868
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2869
      raise errors.OpExecError("Can't shut down the instance's disks.")
2870

    
2871
    instance.primary_node = target_node
2872
    # distribute new instance config to the other nodes
2873
    self.cfg.Update(instance)
2874

    
2875
    # Only start the instance if it's marked as up
2876
    if instance.status == "up":
2877
      feedback_fn("* activating the instance's disks on target node")
2878
      logger.Info("Starting instance %s on node %s" %
2879
                  (instance.name, target_node))
2880

    
2881
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2882
                                               ignore_secondaries=True)
2883
      if not disks_ok:
2884
        _ShutdownInstanceDisks(self, instance)
2885
        raise errors.OpExecError("Can't activate the instance's disks")
2886

    
2887
      feedback_fn("* starting the instance on the target node")
2888
      if not self.rpc.call_instance_start(target_node, instance, None):
2889
        _ShutdownInstanceDisks(self, instance)
2890
        raise errors.OpExecError("Could not start instance %s on node %s." %
2891
                                 (instance.name, target_node))
2892

    
2893

    
2894
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2895
  """Create a tree of block devices on the primary node.
2896

2897
  This always creates all devices.
2898

2899
  """
2900
  if device.children:
2901
    for child in device.children:
2902
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2903
        return False
2904

    
2905
  lu.cfg.SetDiskID(device, node)
2906
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2907
                                       instance.name, True, info)
2908
  if not new_id:
2909
    return False
2910
  if device.physical_id is None:
2911
    device.physical_id = new_id
2912
  return True
2913

    
2914

    
2915
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2916
  """Create a tree of block devices on a secondary node.
2917

2918
  If this device type has to be created on secondaries, create it and
2919
  all its children.
2920

2921
  If not, just recurse to children keeping the same 'force' value.
2922

2923
  """
2924
  if device.CreateOnSecondary():
2925
    force = True
2926
  if device.children:
2927
    for child in device.children:
2928
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2929
                                        child, force, info):
2930
        return False
2931

    
2932
  if not force:
2933
    return True
2934
  lu.cfg.SetDiskID(device, node)
2935
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2936
                                       instance.name, False, info)
2937
  if not new_id:
2938
    return False
2939
  if device.physical_id is None:
2940
    device.physical_id = new_id
2941
  return True
2942

    
2943

    
2944
def _GenerateUniqueNames(lu, exts):
2945
  """Generate a suitable LV name.
2946

2947
  This will generate a logical volume name for the given instance.
2948

2949
  """
2950
  results = []
2951
  for val in exts:
2952
    new_id = lu.cfg.GenerateUniqueID()
2953
    results.append("%s%s" % (new_id, val))
2954
  return results
2955

    
2956

    
2957
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2958
                         p_minor, s_minor):
2959
  """Generate a drbd8 device complete with its children.
2960

2961
  """
2962
  port = lu.cfg.AllocatePort()
2963
  vgname = lu.cfg.GetVGName()
2964
  shared_secret = lu.cfg.GenerateDRBDSecret()
2965
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2966
                          logical_id=(vgname, names[0]))
2967
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2968
                          logical_id=(vgname, names[1]))
2969
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2970
                          logical_id=(primary, secondary, port,
2971
                                      p_minor, s_minor,
2972
                                      shared_secret),
2973
                          children=[dev_data, dev_meta],
2974
                          iv_name=iv_name)
2975
  return drbd_dev
2976

    
2977

    
2978
def _GenerateDiskTemplate(lu, template_name,
2979
                          instance_name, primary_node,
2980
                          secondary_nodes, disk_sz, swap_sz,
2981
                          file_storage_dir, file_driver):
2982
  """Generate the entire disk layout for a given template type.
2983

2984
  """
2985
  #TODO: compute space requirements
2986

    
2987
  vgname = lu.cfg.GetVGName()
2988
  if template_name == constants.DT_DISKLESS:
2989
    disks = []
2990
  elif template_name == constants.DT_PLAIN:
2991
    if len(secondary_nodes) != 0:
2992
      raise errors.ProgrammerError("Wrong template configuration")
2993

    
2994
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2995
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2996
                           logical_id=(vgname, names[0]),
2997
                           iv_name = "sda")
2998
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2999
                           logical_id=(vgname, names[1]),
3000
                           iv_name = "sdb")
3001
    disks = [sda_dev, sdb_dev]
3002
  elif template_name == constants.DT_DRBD8:
3003
    if len(secondary_nodes) != 1:
3004
      raise errors.ProgrammerError("Wrong template configuration")
3005
    remote_node = secondary_nodes[0]
3006
    (minor_pa, minor_pb,
3007
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3008
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3009

    
3010
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3011
                                      ".sdb_data", ".sdb_meta"])
3012
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3013
                                        disk_sz, names[0:2], "sda",
3014
                                        minor_pa, minor_sa)
3015
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3016
                                        swap_sz, names[2:4], "sdb",
3017
                                        minor_pb, minor_sb)
3018
    disks = [drbd_sda_dev, drbd_sdb_dev]
3019
  elif template_name == constants.DT_FILE:
3020
    if len(secondary_nodes) != 0:
3021
      raise errors.ProgrammerError("Wrong template configuration")
3022

    
3023
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3024
                                iv_name="sda", logical_id=(file_driver,
3025
                                "%s/sda" % file_storage_dir))
3026
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3027
                                iv_name="sdb", logical_id=(file_driver,
3028
                                "%s/sdb" % file_storage_dir))
3029
    disks = [file_sda_dev, file_sdb_dev]
3030
  else:
3031
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3032
  return disks
3033

    
3034

    
3035
def _GetInstanceInfoText(instance):
3036
  """Compute that text that should be added to the disk's metadata.
3037

3038
  """
3039
  return "originstname+%s" % instance.name
3040

    
3041

    
3042
def _CreateDisks(lu, instance):
3043
  """Create all disks for an instance.
3044

3045
  This abstracts away some work from AddInstance.
3046

3047
  Args:
3048
    instance: the instance object
3049

3050
  Returns:
3051
    True or False showing the success of the creation process
3052

3053
  """
3054
  info = _GetInstanceInfoText(instance)
3055

    
3056
  if instance.disk_template == constants.DT_FILE:
3057
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3058
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3059
                                                 file_storage_dir)
3060

    
3061
    if not result:
3062
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3063
      return False
3064

    
3065
    if not result[0]:
3066
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3067
      return False
3068

    
3069
  for device in instance.disks:
3070
    logger.Info("creating volume %s for instance %s" %
3071
                (device.iv_name, instance.name))
3072
    #HARDCODE
3073
    for secondary_node in instance.secondary_nodes:
3074
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3075
                                        device, False, info):
3076
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3077
                     (device.iv_name, device, secondary_node))
3078
        return False
3079
    #HARDCODE
3080
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3081
                                    instance, device, info):
3082
      logger.Error("failed to create volume %s on primary!" %
3083
                   device.iv_name)
3084
      return False
3085

    
3086
  return True
3087

    
3088

    
3089
def _RemoveDisks(lu, instance):
3090
  """Remove all disks for an instance.
3091

3092
  This abstracts away some work from `AddInstance()` and
3093
  `RemoveInstance()`. Note that in case some of the devices couldn't
3094
  be removed, the removal will continue with the other ones (compare
3095
  with `_CreateDisks()`).
3096

3097
  Args:
3098
    instance: the instance object
3099

3100
  Returns:
3101
    True or False showing the success of the removal proces
3102

3103
  """
3104
  logger.Info("removing block devices for instance %s" % instance.name)
3105

    
3106
  result = True
3107
  for device in instance.disks:
3108
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3109
      lu.cfg.SetDiskID(disk, node)
3110
      if not lu.rpc.call_blockdev_remove(node, disk):
3111
        logger.Error("could not remove block device %s on node %s,"
3112
                     " continuing anyway" %
3113
                     (device.iv_name, node))
3114
        result = False
3115

    
3116
  if instance.disk_template == constants.DT_FILE:
3117
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3118
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3119
                                               file_storage_dir):
3120
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3121
      result = False
3122

    
3123
  return result
3124

    
3125

    
3126
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3127
  """Compute disk size requirements in the volume group
3128

3129
  This is currently hard-coded for the two-drive layout.
3130

3131
  """
3132
  # Required free disk space as a function of disk and swap space
3133
  req_size_dict = {
3134
    constants.DT_DISKLESS: None,
3135
    constants.DT_PLAIN: disk_size + swap_size,
3136
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3137
    constants.DT_DRBD8: disk_size + swap_size + 256,
3138
    constants.DT_FILE: None,
3139
  }
3140

    
3141
  if disk_template not in req_size_dict:
3142
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3143
                                 " is unknown" %  disk_template)
3144

    
3145
  return req_size_dict[disk_template]
3146

    
3147

    
3148
class LUCreateInstance(LogicalUnit):
3149
  """Create an instance.
3150

3151
  """
3152
  HPATH = "instance-add"
3153
  HTYPE = constants.HTYPE_INSTANCE
3154
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3155
              "disk_template", "swap_size", "mode", "start", "vcpus",
3156
              "wait_for_sync", "ip_check", "mac"]
3157
  REQ_BGL = False
3158

    
3159
  def _ExpandNode(self, node):
3160
    """Expands and checks one node name.
3161

3162
    """
3163
    node_full = self.cfg.ExpandNodeName(node)
3164
    if node_full is None:
3165
      raise errors.OpPrereqError("Unknown node %s" % node)
3166
    return node_full
3167

    
3168
  def ExpandNames(self):
3169
    """ExpandNames for CreateInstance.
3170

3171
    Figure out the right locks for instance creation.
3172

3173
    """
3174
    self.needed_locks = {}
3175

    
3176
    # set optional parameters to none if they don't exist
3177
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3178
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3179
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3180
                 "vnc_bind_address", "hypervisor"]:
3181
      if not hasattr(self.op, attr):
3182
        setattr(self.op, attr, None)
3183

    
3184
    # cheap checks, mostly valid constants given
3185

    
3186
    # verify creation mode
3187
    if self.op.mode not in (constants.INSTANCE_CREATE,
3188
                            constants.INSTANCE_IMPORT):
3189
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3190
                                 self.op.mode)
3191

    
3192
    # disk template and mirror node verification
3193
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3194
      raise errors.OpPrereqError("Invalid disk template name")
3195

    
3196
    if self.op.hypervisor is None:
3197
      self.op.hypervisor = self.cfg.GetHypervisorType()
3198

    
3199
    enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3200
    if self.op.hypervisor not in enabled_hvs:
3201
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3202
                                 " cluster (%s)" % (self.op.hypervisor,
3203
                                  ",".join(enabled_hvs)))
3204

    
3205
    #### instance parameters check
3206

    
3207
    # instance name verification
3208
    hostname1 = utils.HostInfo(self.op.instance_name)
3209
    self.op.instance_name = instance_name = hostname1.name
3210

    
3211
    # this is just a preventive check, but someone might still add this
3212
    # instance in the meantime, and creation will fail at lock-add time
3213
    if instance_name in self.cfg.GetInstanceList():
3214
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3215
                                 instance_name)
3216

    
3217
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3218

    
3219
    # ip validity checks
3220
    ip = getattr(self.op, "ip", None)
3221
    if ip is None or ip.lower() == "none":
3222
      inst_ip = None
3223
    elif ip.lower() == "auto":
3224
      inst_ip = hostname1.ip
3225
    else:
3226
      if not utils.IsValidIP(ip):
3227
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3228
                                   " like a valid IP" % ip)
3229
      inst_ip = ip
3230
    self.inst_ip = self.op.ip = inst_ip
3231
    # used in CheckPrereq for ip ping check
3232
    self.check_ip = hostname1.ip
3233

    
3234
    # MAC address verification
3235
    if self.op.mac != "auto":
3236
      if not utils.IsValidMac(self.op.mac.lower()):
3237
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3238
                                   self.op.mac)
3239

    
3240
    # boot order verification
3241
    if self.op.hvm_boot_order is not None:
3242
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3243
        raise errors.OpPrereqError("invalid boot order specified,"
3244
                                   " must be one or more of [acdn]")
3245
    # file storage checks
3246
    if (self.op.file_driver and
3247
        not self.op.file_driver in constants.FILE_DRIVER):
3248
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3249
                                 self.op.file_driver)
3250

    
3251
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3252
      raise errors.OpPrereqError("File storage directory path not absolute")
3253

    
3254
    ### Node/iallocator related checks
3255
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3256
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3257
                                 " node must be given")
3258

    
3259
    if self.op.iallocator:
3260
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3261
    else:
3262
      self.op.pnode = self._ExpandNode(self.op.pnode)
3263
      nodelist = [self.op.pnode]
3264
      if self.op.snode is not None:
3265
        self.op.snode = self._ExpandNode(self.op.snode)
3266
        nodelist.append(self.op.snode)
3267
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3268

    
3269
    # in case of import lock the source node too
3270
    if self.op.mode == constants.INSTANCE_IMPORT:
3271
      src_node = getattr(self.op, "src_node", None)
3272
      src_path = getattr(self.op, "src_path", None)
3273

    
3274
      if src_node is None or src_path is None:
3275
        raise errors.OpPrereqError("Importing an instance requires source"
3276
                                   " node and path options")
3277

    
3278
      if not os.path.isabs(src_path):
3279
        raise errors.OpPrereqError("The source path must be absolute")
3280

    
3281
      self.op.src_node = src_node = self._ExpandNode(src_node)
3282
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3283
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3284

    
3285
    else: # INSTANCE_CREATE
3286
      if getattr(self.op, "os_type", None) is None:
3287
        raise errors.OpPrereqError("No guest OS specified")
3288

    
3289
  def _RunAllocator(self):
3290
    """Run the allocator based on input opcode.
3291

3292
    """
3293
    disks = [{"size": self.op.disk_size, "mode": "w"},
3294
             {"size": self.op.swap_size, "mode": "w"}]
3295
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3296
             "bridge": self.op.bridge}]
3297
    ial = IAllocator(self,
3298
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3299
                     name=self.op.instance_name,
3300
                     disk_template=self.op.disk_template,
3301
                     tags=[],
3302
                     os=self.op.os_type,
3303
                     vcpus=self.op.vcpus,
3304
                     mem_size=self.op.mem_size,
3305
                     disks=disks,
3306
                     nics=nics,
3307
                     )
3308

    
3309
    ial.Run(self.op.iallocator)
3310

    
3311
    if not ial.success:
3312
      raise errors.OpPrereqError("Can't compute nodes using"
3313
                                 " iallocator '%s': %s" % (self.op.iallocator,
3314
                                                           ial.info))
3315
    if len(ial.nodes) != ial.required_nodes:
3316
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3317
                                 " of nodes (%s), required %s" %
3318
                                 (self.op.iallocator, len(ial.nodes),
3319
                                  ial.required_nodes))
3320
    self.op.pnode = ial.nodes[0]
3321
    logger.ToStdout("Selected nodes for the instance: %s" %
3322
                    (", ".join(ial.nodes),))
3323
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3324
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3325
    if ial.required_nodes == 2:
3326
      self.op.snode = ial.nodes[1]
3327

    
3328
  def BuildHooksEnv(self):
3329
    """Build hooks env.
3330

3331
    This runs on master, primary and secondary nodes of the instance.
3332

3333
    """
3334
    env = {
3335
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3336
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3337
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3338
      "INSTANCE_ADD_MODE": self.op.mode,
3339
      }
3340
    if self.op.mode == constants.INSTANCE_IMPORT:
3341
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3342
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3343
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3344

    
3345
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3346
      primary_node=self.op.pnode,
3347
      secondary_nodes=self.secondaries,
3348
      status=self.instance_status,
3349
      os_type=self.op.os_type,
3350
      memory=self.op.mem_size,
3351
      vcpus=self.op.vcpus,
3352
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3353
    ))
3354

    
3355
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3356
          self.secondaries)
3357
    return env, nl, nl
3358

    
3359

    
3360
  def CheckPrereq(self):
3361
    """Check prerequisites.
3362

3363
    """
3364
    if (not self.cfg.GetVGName() and
3365
        self.op.disk_template not in constants.DTS_NOT_LVM):
3366
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3367
                                 " instances")
3368

    
3369

    
3370
    if self.op.mode == constants.INSTANCE_IMPORT:
3371
      src_node = self.op.src_node
3372
      src_path = self.op.src_path
3373

    
3374
      export_info = self.rpc.call_export_info(src_node, src_path)
3375

    
3376
      if not export_info:
3377
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3378

    
3379
      if not export_info.has_section(constants.INISECT_EXP):
3380
        raise errors.ProgrammerError("Corrupted export config")
3381

    
3382
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3383
      if (int(ei_version) != constants.EXPORT_VERSION):
3384
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3385
                                   (ei_version, constants.EXPORT_VERSION))
3386

    
3387
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3388
        raise errors.OpPrereqError("Can't import instance with more than"
3389
                                   " one data disk")
3390

    
3391
      # FIXME: are the old os-es, disk sizes, etc. useful?
3392
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3393
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3394
                                                         'disk0_dump'))
3395
      self.src_image = diskimage
3396

    
3397
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3398

    
3399
    if self.op.start and not self.op.ip_check:
3400
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3401
                                 " adding an instance in start mode")
3402

    
3403
    if self.op.ip_check:
3404
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3405
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3406
                                   (self.check_ip, self.op.instance_name))
3407

    
3408
    # bridge verification
3409
    bridge = getattr(self.op, "bridge", None)
3410
    if bridge is None:
3411
      self.op.bridge = self.cfg.GetDefBridge()
3412
    else:
3413
      self.op.bridge = bridge
3414

    
3415
    #### allocator run
3416

    
3417
    if self.op.iallocator is not None:
3418
      self._RunAllocator()
3419

    
3420
    #### node related checks
3421

    
3422
    # check primary node
3423
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3424
    assert self.pnode is not None, \
3425
      "Cannot retrieve locked node %s" % self.op.pnode
3426
    self.secondaries = []
3427

    
3428
    # mirror node verification
3429
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3430
      if self.op.snode is None:
3431
        raise errors.OpPrereqError("The networked disk templates need"
3432
                                   " a mirror node")
3433
      if self.op.snode == pnode.name:
3434
        raise errors.OpPrereqError("The secondary node cannot be"
3435
                                   " the primary node.")
3436
      self.secondaries.append(self.op.snode)
3437

    
3438
    req_size = _ComputeDiskSize(self.op.disk_template,
3439
                                self.op.disk_size, self.op.swap_size)
3440

    
3441
    # Check lv size requirements
3442
    if req_size is not None:
3443
      nodenames = [pnode.name] + self.secondaries
3444
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3445
                                         self.op.hypervisor)
3446
      for node in nodenames:
3447
        info = nodeinfo.get(node, None)
3448
        if not info:
3449
          raise errors.OpPrereqError("Cannot get current information"
3450
                                     " from node '%s'" % node)
3451
        vg_free = info.get('vg_free', None)
3452
        if not isinstance(vg_free, int):
3453
          raise errors.OpPrereqError("Can't compute free disk space on"
3454
                                     " node %s" % node)
3455
        if req_size > info['vg_free']:
3456
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3457
                                     " %d MB available, %d MB required" %
3458
                                     (node, info['vg_free'], req_size))
3459

    
3460
    # os verification
3461
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3462
    if not os_obj:
3463
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3464
                                 " primary node"  % self.op.os_type)
3465

    
3466
    if self.op.kernel_path == constants.VALUE_NONE:
3467
      raise errors.OpPrereqError("Can't set instance kernel to none")
3468

    
3469
    # bridge check on primary node
3470
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3471
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3472
                                 " destination node '%s'" %
3473
                                 (self.op.bridge, pnode.name))
3474

    
3475
    # memory check on primary node
3476
    if self.op.start:
3477
      _CheckNodeFreeMemory(self, self.pnode.name,
3478
                           "creating instance %s" % self.op.instance_name,
3479
                           self.op.mem_size, self.op.hypervisor)
3480

    
3481
    # hvm_cdrom_image_path verification
3482
    if self.op.hvm_cdrom_image_path is not None:
3483
      # FIXME (als): shouldn't these checks happen on the destination node?
3484
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3485
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3486
                                   " be an absolute path or None, not %s" %
3487
                                   self.op.hvm_cdrom_image_path)
3488
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3489
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3490
                                   " regular file or a symlink pointing to"
3491
                                   " an existing regular file, not %s" %
3492
                                   self.op.hvm_cdrom_image_path)
3493

    
3494
    # vnc_bind_address verification
3495
    if self.op.vnc_bind_address is not None:
3496
      if not utils.IsValidIP(self.op.vnc_bind_address):
3497
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3498
                                   " like a valid IP address" %
3499
                                   self.op.vnc_bind_address)
3500

    
3501
    # Xen HVM device type checks
3502
    if self.op.hypervisor == constants.HT_XEN_HVM:
3503
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3504
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3505
                                   " hypervisor" % self.op.hvm_nic_type)
3506
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3507
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3508
                                   " hypervisor" % self.op.hvm_disk_type)
3509

    
3510
    if self.op.start:
3511
      self.instance_status = 'up'
3512
    else:
3513
      self.instance_status = 'down'
3514

    
3515
  def Exec(self, feedback_fn):
3516
    """Create and add the instance to the cluster.
3517

3518
    """
3519
    instance = self.op.instance_name
3520
    pnode_name = self.pnode.name
3521

    
3522
    if self.op.mac == "auto":
3523
      mac_address = self.cfg.GenerateMAC()
3524
    else:
3525
      mac_address = self.op.mac
3526

    
3527
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3528
    if self.inst_ip is not None:
3529
      nic.ip = self.inst_ip
3530

    
3531
    ht_kind = self.op.hypervisor
3532
    if ht_kind in constants.HTS_REQ_PORT:
3533
      network_port = self.cfg.AllocatePort()
3534
    else:
3535
      network_port = None
3536

    
3537
    if self.op.vnc_bind_address is None:
3538
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3539

    
3540
    # this is needed because os.path.join does not accept None arguments
3541
    if self.op.file_storage_dir is None:
3542
      string_file_storage_dir = ""
3543
    else:
3544
      string_file_storage_dir = self.op.file_storage_dir
3545

    
3546
    # build the full file storage dir path
3547
    file_storage_dir = os.path.normpath(os.path.join(
3548
                                        self.cfg.GetFileStorageDir(),
3549
                                        string_file_storage_dir, instance))
3550

    
3551

    
3552
    disks = _GenerateDiskTemplate(self,
3553
                                  self.op.disk_template,
3554
                                  instance, pnode_name,
3555
                                  self.secondaries, self.op.disk_size,
3556
                                  self.op.swap_size,
3557
                                  file_storage_dir,
3558
                                  self.op.file_driver)
3559

    
3560
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3561
                            primary_node=pnode_name,
3562
                            memory=self.op.mem_size,
3563
                            vcpus=self.op.vcpus,
3564
                            nics=[nic], disks=disks,
3565
                            disk_template=self.op.disk_template,
3566
                            status=self.instance_status,
3567
                            network_port=network_port,
3568
                            kernel_path=self.op.kernel_path,
3569
                            initrd_path=self.op.initrd_path,
3570
                            hvm_boot_order=self.op.hvm_boot_order,
3571
                            hvm_acpi=self.op.hvm_acpi,
3572
                            hvm_pae=self.op.hvm_pae,
3573
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3574
                            vnc_bind_address=self.op.vnc_bind_address,
3575
                            hvm_nic_type=self.op.hvm_nic_type,
3576
                            hvm_disk_type=self.op.hvm_disk_type,
3577
                            hypervisor=self.op.hypervisor,
3578
                            )
3579

    
3580
    feedback_fn("* creating instance disks...")
3581
    if not _CreateDisks(self, iobj):
3582
      _RemoveDisks(self, iobj)
3583
      self.cfg.ReleaseDRBDMinors(instance)
3584
      raise errors.OpExecError("Device creation failed, reverting...")
3585

    
3586
    feedback_fn("adding instance %s to cluster config" % instance)
3587

    
3588
    self.cfg.AddInstance(iobj)
3589
    # Declare that we don't want to remove the instance lock anymore, as we've
3590
    # added the instance to the config
3591
    del self.remove_locks[locking.LEVEL_INSTANCE]
3592
    # Remove the temp. assignements for the instance's drbds
3593
    self.cfg.ReleaseDRBDMinors(instance)
3594

    
3595
    if self.op.wait_for_sync:
3596
      disk_abort = not _WaitForSync(self, iobj)
3597
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3598
      # make sure the disks are not degraded (still sync-ing is ok)
3599
      time.sleep(15)
3600
      feedback_fn("* checking mirrors status")
3601
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3602
    else:
3603
      disk_abort = False
3604

    
3605
    if disk_abort:
3606
      _RemoveDisks(self, iobj)
3607
      self.cfg.RemoveInstance(iobj.name)
3608
      # Make sure the instance lock gets removed
3609
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3610
      raise errors.OpExecError("There are some degraded disks for"
3611
                               " this instance")
3612

    
3613
    feedback_fn("creating os for instance %s on node %s" %
3614
                (instance, pnode_name))
3615

    
3616
    if iobj.disk_template != constants.DT_DISKLESS:
3617
      if self.op.mode == constants.INSTANCE_CREATE:
3618
        feedback_fn("* running the instance OS create scripts...")
3619
        if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3620
          raise errors.OpExecError("could not add os for instance %s"
3621
                                   " on node %s" %
3622
                                   (instance, pnode_name))
3623

    
3624
      elif self.op.mode == constants.INSTANCE_IMPORT:
3625
        feedback_fn("* running the instance OS import scripts...")
3626
        src_node = self.op.src_node
3627
        src_image = self.src_image
3628
        cluster_name = self.cfg.GetClusterName()
3629
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3630
                                                src_node, src_image,
3631
                                                cluster_name):
3632
          raise errors.OpExecError("Could not import os for instance"
3633
                                   " %s on node %s" %
3634
                                   (instance, pnode_name))
3635
      else:
3636
        # also checked in the prereq part
3637
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3638
                                     % self.op.mode)
3639

    
3640
    if self.op.start:
3641
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3642
      feedback_fn("* starting instance...")
3643
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3644
        raise errors.OpExecError("Could not start instance")
3645

    
3646

    
3647
class LUConnectConsole(NoHooksLU):
3648
  """Connect to an instance's console.
3649

3650
  This is somewhat special in that it returns the command line that
3651
  you need to run on the master node in order to connect to the
3652
  console.
3653

3654
  """
3655
  _OP_REQP = ["instance_name"]
3656
  REQ_BGL = False
3657

    
3658
  def ExpandNames(self):
3659
    self._ExpandAndLockInstance()
3660

    
3661
  def CheckPrereq(self):
3662
    """Check prerequisites.
3663

3664
    This checks that the instance is in the cluster.
3665

3666
    """
3667
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3668
    assert self.instance is not None, \
3669
      "Cannot retrieve locked instance %s" % self.op.instance_name
3670

    
3671
  def Exec(self, feedback_fn):
3672
    """Connect to the console of an instance
3673

3674
    """
3675
    instance = self.instance
3676
    node = instance.primary_node
3677

    
3678
    node_insts = self.rpc.call_instance_list([node],
3679
                                             [instance.hypervisor])[node]
3680
    if node_insts is False:
3681
      raise errors.OpExecError("Can't connect to node %s." % node)
3682

    
3683
    if instance.name not in node_insts:
3684
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3685

    
3686
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3687

    
3688
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3689
    console_cmd = hyper.GetShellCommandForConsole(instance)
3690

    
3691
    # build ssh cmdline
3692
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3693

    
3694

    
3695
class LUReplaceDisks(LogicalUnit):
3696
  """Replace the disks of an instance.
3697

3698
  """
3699
  HPATH = "mirrors-replace"
3700
  HTYPE = constants.HTYPE_INSTANCE
3701
  _OP_REQP = ["instance_name", "mode", "disks"]
3702
  REQ_BGL = False
3703

    
3704
  def ExpandNames(self):
3705
    self._ExpandAndLockInstance()
3706

    
3707
    if not hasattr(self.op, "remote_node"):
3708
      self.op.remote_node = None
3709

    
3710
    ia_name = getattr(self.op, "iallocator", None)
3711
    if ia_name is not None:
3712
      if self.op.remote_node is not None:
3713
        raise errors.OpPrereqError("Give either the iallocator or the new"
3714
                                   " secondary, not both")
3715
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3716
    elif self.op.remote_node is not None:
3717
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3718
      if remote_node is None:
3719
        raise errors.OpPrereqError("Node '%s' not known" %
3720
                                   self.op.remote_node)
3721
      self.op.remote_node = remote_node
3722
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3723
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3724
    else:
3725
      self.needed_locks[locking.LEVEL_NODE] = []
3726
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3727

    
3728
  def DeclareLocks(self, level):
3729
    # If we're not already locking all nodes in the set we have to declare the
3730
    # instance's primary/secondary nodes.
3731
    if (level == locking.LEVEL_NODE and
3732
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3733
      self._LockInstancesNodes()
3734

    
3735
  def _RunAllocator(self):
3736
    """Compute a new secondary node using an IAllocator.
3737

3738
    """
3739
    ial = IAllocator(self,
3740
                     mode=constants.IALLOCATOR_MODE_RELOC,
3741
                     name=self.op.instance_name,
3742
                     relocate_from=[self.sec_node])
3743

    
3744
    ial.Run(self.op.iallocator)
3745

    
3746
    if not ial.success:
3747
      raise errors.OpPrereqError("Can't compute nodes using"
3748
                                 " iallocator '%s': %s" % (self.op.iallocator,
3749
                                                           ial.info))
3750
    if len(ial.nodes) != ial.required_nodes:
3751
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3752
                                 " of nodes (%s), required %s" %
3753
                                 (len(ial.nodes), ial.required_nodes))
3754
    self.op.remote_node = ial.nodes[0]
3755
    logger.ToStdout("Selected new secondary for the instance: %s" %
3756
                    self.op.remote_node)
3757

    
3758
  def BuildHooksEnv(self):
3759
    """Build hooks env.
3760

3761
    This runs on the master, the primary and all the secondaries.
3762

3763
    """
3764
    env = {
3765
      "MODE": self.op.mode,
3766
      "NEW_SECONDARY": self.op.remote_node,
3767
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3768
      }
3769
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3770
    nl = [
3771
      self.cfg.GetMasterNode(),
3772
      self.instance.primary_node,
3773
      ]
3774
    if self.op.remote_node is not None:
3775
      nl.append(self.op.remote_node)
3776
    return env, nl, nl
3777

    
3778
  def CheckPrereq(self):
3779
    """Check prerequisites.
3780

3781
    This checks that the instance is in the cluster.
3782

3783
    """
3784
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3785
    assert instance is not None, \
3786
      "Cannot retrieve locked instance %s" % self.op.instance_name
3787
    self.instance = instance
3788

    
3789
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3790
      raise errors.OpPrereqError("Instance's disk layout is not"
3791
                                 " network mirrored.")
3792

    
3793
    if len(instance.secondary_nodes) != 1:
3794
      raise errors.OpPrereqError("The instance has a strange layout,"
3795
                                 " expected one secondary but found %d" %
3796
                                 len(instance.secondary_nodes))
3797

    
3798
    self.sec_node = instance.secondary_nodes[0]
3799

    
3800
    ia_name = getattr(self.op, "iallocator", None)
3801
    if ia_name is not None:
3802
      self._RunAllocator()
3803

    
3804
    remote_node = self.op.remote_node
3805
    if remote_node is not None:
3806
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3807
      assert self.remote_node_info is not None, \
3808
        "Cannot retrieve locked node %s" % remote_node
3809
    else:
3810
      self.remote_node_info = None
3811
    if remote_node == instance.primary_node:
3812
      raise errors.OpPrereqError("The specified node is the primary node of"
3813
                                 " the instance.")
3814
    elif remote_node == self.sec_node:
3815
      if self.op.mode == constants.REPLACE_DISK_SEC:
3816
        # this is for DRBD8, where we can't execute the same mode of
3817
        # replacement as for drbd7 (no different port allocated)
3818
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3819
                                   " replacement")
3820
    if instance.disk_template == constants.DT_DRBD8:
3821
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3822
          remote_node is not None):
3823
        # switch to replace secondary mode
3824
        self.op.mode = constants.REPLACE_DISK_SEC
3825

    
3826
      if self.op.mode == constants.REPLACE_DISK_ALL:
3827
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3828
                                   " secondary disk replacement, not"
3829
                                   " both at once")
3830
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3831
        if remote_node is not None:
3832
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3833
                                     " the secondary while doing a primary"
3834
                                     " node disk replacement")
3835
        self.tgt_node = instance.primary_node
3836
        self.oth_node = instance.secondary_nodes[0]
3837
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3838
        self.new_node = remote_node # this can be None, in which case
3839
                                    # we don't change the secondary
3840
        self.tgt_node = instance.secondary_nodes[0]
3841
        self.oth_node = instance.primary_node
3842
      else:
3843
        raise errors.ProgrammerError("Unhandled disk replace mode")
3844

    
3845
    for name in self.op.disks:
3846
      if instance.FindDisk(name) is None:
3847
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3848
                                   (name, instance.name))
3849

    
3850
  def _ExecD8DiskOnly(self, feedback_fn):
3851
    """Replace a disk on the primary or secondary for dbrd8.
3852

3853
    The algorithm for replace is quite complicated:
3854
      - for each disk to be replaced:
3855
        - create new LVs on the target node with unique names
3856
        - detach old LVs from the drbd device
3857
        - rename old LVs to name_replaced.<time_t>
3858
        - rename new LVs to old LVs
3859
        - attach the new LVs (with the old names now) to the drbd device
3860
      - wait for sync across all devices
3861
      - for each modified disk:
3862
        - remove old LVs (which have the name name_replaces.<time_t>)
3863

3864
    Failures are not very well handled.
3865

3866
    """
3867
    steps_total = 6
3868
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3869
    instance = self.instance
3870
    iv_names = {}
3871
    vgname = self.cfg.GetVGName()
3872
    # start of work
3873
    cfg = self.cfg
3874
    tgt_node = self.tgt_node
3875
    oth_node = self.oth_node
3876

    
3877
    # Step: check device activation
3878
    self.proc.LogStep(1, steps_total, "check device existence")
3879
    info("checking volume groups")
3880
    my_vg = cfg.GetVGName()
3881
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3882
    if not results:
3883
      raise errors.OpExecError("Can't list volume groups on the nodes")
3884
    for node in oth_node, tgt_node:
3885
      res = results.get(node, False)
3886
      if not res or my_vg not in res:
3887
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3888
                                 (my_vg, node))
3889
    for dev in instance.disks:
3890
      if not dev.iv_name in self.op.disks:
3891
        continue
3892
      for node in tgt_node, oth_node:
3893
        info("checking %s on %s" % (dev.iv_name, node))
3894
        cfg.SetDiskID(dev, node)
3895
        if not self.rpc.call_blockdev_find(node, dev):
3896
          raise errors.OpExecError("Can't find device %s on node %s" %
3897
                                   (dev.iv_name, node))
3898

    
3899
    # Step: check other node consistency
3900
    self.proc.LogStep(2, steps_total, "check peer consistency")
3901
    for dev in instance.disks:
3902
      if not dev.iv_name in self.op.disks:
3903
        continue
3904
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3905
      if not _CheckDiskConsistency(self, dev, oth_node,
3906
                                   oth_node==instance.primary_node):
3907
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3908
                                 " to replace disks on this node (%s)" %
3909
                                 (oth_node, tgt_node))
3910

    
3911
    # Step: create new storage
3912
    self.proc.LogStep(3, steps_total, "allocate new storage")
3913
    for dev in instance.disks:
3914
      if not dev.iv_name in self.op.disks:
3915
        continue
3916
      size = dev.size
3917
      cfg.SetDiskID(dev, tgt_node)
3918
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3919
      names = _GenerateUniqueNames(self, lv_names)
3920
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3921
                             logical_id=(vgname, names[0]))
3922
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3923
                             logical_id=(vgname, names[1]))
3924
      new_lvs = [lv_data, lv_meta]
3925
      old_lvs = dev.children
3926
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3927
      info("creating new local storage on %s for %s" %
3928
           (tgt_node, dev.iv_name))
3929
      # since we *always* want to create this LV, we use the
3930
      # _Create...OnPrimary (which forces the creation), even if we
3931
      # are talking about the secondary node
3932
      for new_lv in new_lvs:
3933
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3934
                                        _GetInstanceInfoText(instance)):
3935
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3936
                                   " node '%s'" %
3937
                                   (new_lv.logical_id[1], tgt_node))
3938

    
3939
    # Step: for each lv, detach+rename*2+attach
3940
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3941
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3942
      info("detaching %s drbd from local storage" % dev.iv_name)
3943
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3944
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3945
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3946
      #dev.children = []
3947
      #cfg.Update(instance)
3948

    
3949
      # ok, we created the new LVs, so now we know we have the needed
3950
      # storage; as such, we proceed on the target node to rename
3951
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3952
      # using the assumption that logical_id == physical_id (which