Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 72737a7f

History | View | Annotate | Download (191.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_MASTER = True
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90

    
91
    for attr_name in self._OP_REQP:
92
      attr_val = getattr(op, attr_name, None)
93
      if attr_val is None:
94
        raise errors.OpPrereqError("Required parameter '%s' missing" %
95
                                   attr_name)
96

    
97
    if not self.cfg.IsCluster():
98
      raise errors.OpPrereqError("Cluster not initialized yet,"
99
                                 " use 'gnt-cluster init' first.")
100
    if self.REQ_MASTER:
101
      master = self.cfg.GetMasterNode()
102
      if master != utils.HostInfo().name:
103
        raise errors.OpPrereqError("Commands must be run on the master"
104
                                   " node %s" % master)
105

    
106
  def __GetSSH(self):
107
    """Returns the SshRunner object
108

109
    """
110
    if not self.__ssh:
111
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112
    return self.__ssh
113

    
114
  ssh = property(fget=__GetSSH)
115

    
116
  def ExpandNames(self):
117
    """Expand names for this LU.
118

119
    This method is called before starting to execute the opcode, and it should
120
    update all the parameters of the opcode to their canonical form (e.g. a
121
    short node name must be fully expanded after this method has successfully
122
    completed). This way locking, hooks, logging, ecc. can work correctly.
123

124
    LUs which implement this method must also populate the self.needed_locks
125
    member, as a dict with lock levels as keys, and a list of needed lock names
126
    as values. Rules:
127
      - Use an empty dict if you don't need any lock
128
      - If you don't need any lock at a particular level omit that level
129
      - Don't put anything for the BGL level
130
      - If you want all locks at a level use locking.ALL_SET as a value
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: locking.ALL_SET,
140
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self, primary_only=False):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    @type primary_only: boolean
285
    @param primary_only: only lock primary nodes of locked instances
286

287
    """
288
    assert locking.LEVEL_NODE in self.recalculate_locks, \
289
      "_LockInstancesNodes helper function called with no nodes to recalculate"
290

    
291
    # TODO: check if we're really been called with the instance locks held
292

    
293
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294
    # future we might want to have different behaviors depending on the value
295
    # of self.recalculate_locks[locking.LEVEL_NODE]
296
    wanted_nodes = []
297
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298
      instance = self.context.cfg.GetInstanceInfo(instance_name)
299
      wanted_nodes.append(instance.primary_node)
300
      if not primary_only:
301
        wanted_nodes.extend(instance.secondary_nodes)
302

    
303
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307

    
308
    del self.recalculate_locks[locking.LEVEL_NODE]
309

    
310

    
311
class NoHooksLU(LogicalUnit):
312
  """Simple LU which runs no hooks.
313

314
  This LU is intended as a parent for other LogicalUnits which will
315
  run no hooks, in order to reduce duplicate code.
316

317
  """
318
  HPATH = None
319
  HTYPE = None
320

    
321

    
322
def _GetWantedNodes(lu, nodes):
323
  """Returns list of checked and expanded node names.
324

325
  Args:
326
    nodes: List of nodes (strings) or None for all
327

328
  """
329
  if not isinstance(nodes, list):
330
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
331

    
332
  if not nodes:
333
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334
      " non-empty list of nodes whose name is to be expanded.")
335

    
336
  wanted = []
337
  for name in nodes:
338
    node = lu.cfg.ExpandNodeName(name)
339
    if node is None:
340
      raise errors.OpPrereqError("No such node name '%s'" % name)
341
    wanted.append(node)
342

    
343
  return utils.NiceSort(wanted)
344

    
345

    
346
def _GetWantedInstances(lu, instances):
347
  """Returns list of checked and expanded instance names.
348

349
  Args:
350
    instances: List of instances (strings) or None for all
351

352
  """
353
  if not isinstance(instances, list):
354
    raise errors.OpPrereqError("Invalid argument type 'instances'")
355

    
356
  if instances:
357
    wanted = []
358

    
359
    for name in instances:
360
      instance = lu.cfg.ExpandInstanceName(name)
361
      if instance is None:
362
        raise errors.OpPrereqError("No such instance name '%s'" % name)
363
      wanted.append(instance)
364

    
365
  else:
366
    wanted = lu.cfg.GetInstanceList()
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _CheckOutputFields(static, dynamic, selected):
371
  """Checks whether all selected fields are valid.
372

373
  Args:
374
    static: Static fields
375
    dynamic: Dynamic fields
376

377
  """
378
  static_fields = frozenset(static)
379
  dynamic_fields = frozenset(dynamic)
380

    
381
  all_fields = static_fields | dynamic_fields
382

    
383
  if not all_fields.issuperset(selected):
384
    raise errors.OpPrereqError("Unknown output fields selected: %s"
385
                               % ",".join(frozenset(selected).
386
                                          difference(all_fields)))
387

    
388

    
389
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390
                          memory, vcpus, nics):
391
  """Builds instance related env variables for hooks from single variables.
392

393
  Args:
394
    secondary_nodes: List of secondary nodes as strings
395
  """
396
  env = {
397
    "OP_TARGET": name,
398
    "INSTANCE_NAME": name,
399
    "INSTANCE_PRIMARY": primary_node,
400
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401
    "INSTANCE_OS_TYPE": os_type,
402
    "INSTANCE_STATUS": status,
403
    "INSTANCE_MEMORY": memory,
404
    "INSTANCE_VCPUS": vcpus,
405
  }
406

    
407
  if nics:
408
    nic_count = len(nics)
409
    for idx, (ip, bridge, mac) in enumerate(nics):
410
      if ip is None:
411
        ip = ""
412
      env["INSTANCE_NIC%d_IP" % idx] = ip
413
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415
  else:
416
    nic_count = 0
417

    
418
  env["INSTANCE_NIC_COUNT"] = nic_count
419

    
420
  return env
421

    
422

    
423
def _BuildInstanceHookEnvByObject(instance, override=None):
424
  """Builds instance related env variables for hooks from an object.
425

426
  Args:
427
    instance: objects.Instance object of instance
428
    override: dict of values to override
429
  """
430
  args = {
431
    'name': instance.name,
432
    'primary_node': instance.primary_node,
433
    'secondary_nodes': instance.secondary_nodes,
434
    'os_type': instance.os,
435
    'status': instance.os,
436
    'memory': instance.memory,
437
    'vcpus': instance.vcpus,
438
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
439
  }
440
  if override:
441
    args.update(override)
442
  return _BuildInstanceHookEnv(**args)
443

    
444

    
445
def _CheckInstanceBridgesExist(lu, instance):
446
  """Check that the brigdes needed by an instance exist.
447

448
  """
449
  # check bridges existance
450
  brlist = [nic.bridge for nic in instance.nics]
451
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
452
    raise errors.OpPrereqError("one or more target bridges %s does not"
453
                               " exist on destination node '%s'" %
454
                               (brlist, instance.primary_node))
455

    
456

    
457
class LUDestroyCluster(NoHooksLU):
458
  """Logical unit for destroying the cluster.
459

460
  """
461
  _OP_REQP = []
462

    
463
  def CheckPrereq(self):
464
    """Check prerequisites.
465

466
    This checks whether the cluster is empty.
467

468
    Any errors are signalled by raising errors.OpPrereqError.
469

470
    """
471
    master = self.cfg.GetMasterNode()
472

    
473
    nodelist = self.cfg.GetNodeList()
474
    if len(nodelist) != 1 or nodelist[0] != master:
475
      raise errors.OpPrereqError("There are still %d node(s) in"
476
                                 " this cluster." % (len(nodelist) - 1))
477
    instancelist = self.cfg.GetInstanceList()
478
    if instancelist:
479
      raise errors.OpPrereqError("There are still %d instance(s) in"
480
                                 " this cluster." % len(instancelist))
481

    
482
  def Exec(self, feedback_fn):
483
    """Destroys the cluster.
484

485
    """
486
    master = self.cfg.GetMasterNode()
487
    if not self.rpc.call_node_stop_master(master, False):
488
      raise errors.OpExecError("Could not disable the master role")
489
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490
    utils.CreateBackup(priv_key)
491
    utils.CreateBackup(pub_key)
492
    return master
493

    
494

    
495
class LUVerifyCluster(LogicalUnit):
496
  """Verifies the cluster status.
497

498
  """
499
  HPATH = "cluster-verify"
500
  HTYPE = constants.HTYPE_CLUSTER
501
  _OP_REQP = ["skip_checks"]
502
  REQ_BGL = False
503

    
504
  def ExpandNames(self):
505
    self.needed_locks = {
506
      locking.LEVEL_NODE: locking.ALL_SET,
507
      locking.LEVEL_INSTANCE: locking.ALL_SET,
508
    }
509
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
510

    
511
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512
                  remote_version, feedback_fn):
513
    """Run multiple tests against a node.
514

515
    Test list:
516
      - compares ganeti version
517
      - checks vg existance and size > 20G
518
      - checks config file checksum
519
      - checks ssh to other nodes
520

521
    Args:
522
      node: name of the node to check
523
      file_list: required list of files
524
      local_cksum: dictionary of local files and their checksums
525

526
    """
527
    # compares ganeti version
528
    local_version = constants.PROTOCOL_VERSION
529
    if not remote_version:
530
      feedback_fn("  - ERROR: connection to %s failed" % (node))
531
      return True
532

    
533
    if local_version != remote_version:
534
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
535
                      (local_version, node, remote_version))
536
      return True
537

    
538
    # checks vg existance and size > 20G
539

    
540
    bad = False
541
    if not vglist:
542
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
543
                      (node,))
544
      bad = True
545
    else:
546
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547
                                            constants.MIN_VG_SIZE)
548
      if vgstatus:
549
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
550
        bad = True
551

    
552
    # checks config file checksum
553
    # checks ssh to any
554

    
555
    if 'filelist' not in node_result:
556
      bad = True
557
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
558
    else:
559
      remote_cksum = node_result['filelist']
560
      for file_name in file_list:
561
        if file_name not in remote_cksum:
562
          bad = True
563
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
564
        elif remote_cksum[file_name] != local_cksum[file_name]:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
567

    
568
    if 'nodelist' not in node_result:
569
      bad = True
570
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
571
    else:
572
      if node_result['nodelist']:
573
        bad = True
574
        for node in node_result['nodelist']:
575
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
576
                          (node, node_result['nodelist'][node]))
577
    if 'node-net-test' not in node_result:
578
      bad = True
579
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
580
    else:
581
      if node_result['node-net-test']:
582
        bad = True
583
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
584
        for node in nlist:
585
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
586
                          (node, node_result['node-net-test'][node]))
587

    
588
    hyp_result = node_result.get('hypervisor', None)
589
    if isinstance(hyp_result, dict):
590
      for hv_name, hv_result in hyp_result.iteritems():
591
        if hv_result is not None:
592
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
593
                      (hv_name, hv_result))
594
    return bad
595

    
596
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597
                      node_instance, feedback_fn):
598
    """Verify an instance.
599

600
    This function checks to see if the required block devices are
601
    available on the instance's node.
602

603
    """
604
    bad = False
605

    
606
    node_current = instanceconfig.primary_node
607

    
608
    node_vol_should = {}
609
    instanceconfig.MapLVsByNode(node_vol_should)
610

    
611
    for node in node_vol_should:
612
      for volume in node_vol_should[node]:
613
        if node not in node_vol_is or volume not in node_vol_is[node]:
614
          feedback_fn("  - ERROR: volume %s missing on node %s" %
615
                          (volume, node))
616
          bad = True
617

    
618
    if not instanceconfig.status == 'down':
619
      if (node_current not in node_instance or
620
          not instance in node_instance[node_current]):
621
        feedback_fn("  - ERROR: instance %s not running on node %s" %
622
                        (instance, node_current))
623
        bad = True
624

    
625
    for node in node_instance:
626
      if (not node == node_current):
627
        if instance in node_instance[node]:
628
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
629
                          (instance, node))
630
          bad = True
631

    
632
    return bad
633

    
634
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635
    """Verify if there are any unknown volumes in the cluster.
636

637
    The .os, .swap and backup volumes are ignored. All other volumes are
638
    reported as unknown.
639

640
    """
641
    bad = False
642

    
643
    for node in node_vol_is:
644
      for volume in node_vol_is[node]:
645
        if node not in node_vol_should or volume not in node_vol_should[node]:
646
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647
                      (volume, node))
648
          bad = True
649
    return bad
650

    
651
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652
    """Verify the list of running instances.
653

654
    This checks what instances are running but unknown to the cluster.
655

656
    """
657
    bad = False
658
    for node in node_instance:
659
      for runninginstance in node_instance[node]:
660
        if runninginstance not in instancelist:
661
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662
                          (runninginstance, node))
663
          bad = True
664
    return bad
665

    
666
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667
    """Verify N+1 Memory Resilience.
668

669
    Check that if one single node dies we can still start all the instances it
670
    was primary for.
671

672
    """
673
    bad = False
674

    
675
    for node, nodeinfo in node_info.iteritems():
676
      # This code checks that every node which is now listed as secondary has
677
      # enough memory to host all instances it is supposed to should a single
678
      # other node in the cluster fail.
679
      # FIXME: not ready for failover to an arbitrary node
680
      # FIXME: does not support file-backed instances
681
      # WARNING: we currently take into account down instances as well as up
682
      # ones, considering that even if they're down someone might want to start
683
      # them even in the event of a node failure.
684
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685
        needed_mem = 0
686
        for instance in instances:
687
          needed_mem += instance_cfg[instance].memory
688
        if nodeinfo['mfree'] < needed_mem:
689
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690
                      " failovers should node %s fail" % (node, prinode))
691
          bad = True
692
    return bad
693

    
694
  def CheckPrereq(self):
695
    """Check prerequisites.
696

697
    Transform the list of checks we're going to skip into a set and check that
698
    all its members are valid.
699

700
    """
701
    self.skip_set = frozenset(self.op.skip_checks)
702
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    Cluster-Verify hooks just rone in the post phase and their failure makes
709
    the output be logged in the verify output and the verification to fail.
710

711
    """
712
    all_nodes = self.cfg.GetNodeList()
713
    # TODO: populate the environment with useful information for verify hooks
714
    env = {}
715
    return env, [], all_nodes
716

    
717
  def Exec(self, feedback_fn):
718
    """Verify integrity of cluster, performing various test on nodes.
719

720
    """
721
    bad = False
722
    feedback_fn("* Verifying global settings")
723
    for msg in self.cfg.VerifyConfig():
724
      feedback_fn("  - ERROR: %s" % msg)
725

    
726
    vg_name = self.cfg.GetVGName()
727
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
728
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
729
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
730
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
731
    i_non_redundant = [] # Non redundant instances
732
    node_volume = {}
733
    node_instance = {}
734
    node_info = {}
735
    instance_cfg = {}
736

    
737
    # FIXME: verify OS list
738
    # do local checksums
739
    file_names = []
740
    file_names.append(constants.SSL_CERT_FILE)
741
    file_names.append(constants.CLUSTER_CONF_FILE)
742
    local_checksums = utils.FingerprintFiles(file_names)
743

    
744
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
745
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
746
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
747
    all_vglist = self.rpc.call_vg_list(nodelist)
748
    node_verify_param = {
749
      'filelist': file_names,
750
      'nodelist': nodelist,
751
      'hypervisor': hypervisors,
752
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
753
                        for node in nodeinfo]
754
      }
755
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
756
                                           self.cfg.GetClusterName())
757
    all_rversion = self.rpc.call_version(nodelist)
758
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
759
                                        self.cfg.GetHypervisorType())
760

    
761
    for node in nodelist:
762
      feedback_fn("* Verifying node %s" % node)
763
      result = self._VerifyNode(node, file_names, local_checksums,
764
                                all_vglist[node], all_nvinfo[node],
765
                                all_rversion[node], feedback_fn)
766
      bad = bad or result
767

    
768
      # node_volume
769
      volumeinfo = all_volumeinfo[node]
770

    
771
      if isinstance(volumeinfo, basestring):
772
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
773
                    (node, volumeinfo[-400:].encode('string_escape')))
774
        bad = True
775
        node_volume[node] = {}
776
      elif not isinstance(volumeinfo, dict):
777
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
778
        bad = True
779
        continue
780
      else:
781
        node_volume[node] = volumeinfo
782

    
783
      # node_instance
784
      nodeinstance = all_instanceinfo[node]
785
      if type(nodeinstance) != list:
786
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
787
        bad = True
788
        continue
789

    
790
      node_instance[node] = nodeinstance
791

    
792
      # node_info
793
      nodeinfo = all_ninfo[node]
794
      if not isinstance(nodeinfo, dict):
795
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
796
        bad = True
797
        continue
798

    
799
      try:
800
        node_info[node] = {
801
          "mfree": int(nodeinfo['memory_free']),
802
          "dfree": int(nodeinfo['vg_free']),
803
          "pinst": [],
804
          "sinst": [],
805
          # dictionary holding all instances this node is secondary for,
806
          # grouped by their primary node. Each key is a cluster node, and each
807
          # value is a list of instances which have the key as primary and the
808
          # current node as secondary.  this is handy to calculate N+1 memory
809
          # availability if you can only failover from a primary to its
810
          # secondary.
811
          "sinst-by-pnode": {},
812
        }
813
      except ValueError:
814
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
815
        bad = True
816
        continue
817

    
818
    node_vol_should = {}
819

    
820
    for instance in instancelist:
821
      feedback_fn("* Verifying instance %s" % instance)
822
      inst_config = self.cfg.GetInstanceInfo(instance)
823
      result =  self._VerifyInstance(instance, inst_config, node_volume,
824
                                     node_instance, feedback_fn)
825
      bad = bad or result
826

    
827
      inst_config.MapLVsByNode(node_vol_should)
828

    
829
      instance_cfg[instance] = inst_config
830

    
831
      pnode = inst_config.primary_node
832
      if pnode in node_info:
833
        node_info[pnode]['pinst'].append(instance)
834
      else:
835
        feedback_fn("  - ERROR: instance %s, connection to primary node"
836
                    " %s failed" % (instance, pnode))
837
        bad = True
838

    
839
      # If the instance is non-redundant we cannot survive losing its primary
840
      # node, so we are not N+1 compliant. On the other hand we have no disk
841
      # templates with more than one secondary so that situation is not well
842
      # supported either.
843
      # FIXME: does not support file-backed instances
844
      if len(inst_config.secondary_nodes) == 0:
845
        i_non_redundant.append(instance)
846
      elif len(inst_config.secondary_nodes) > 1:
847
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
848
                    % instance)
849

    
850
      for snode in inst_config.secondary_nodes:
851
        if snode in node_info:
852
          node_info[snode]['sinst'].append(instance)
853
          if pnode not in node_info[snode]['sinst-by-pnode']:
854
            node_info[snode]['sinst-by-pnode'][pnode] = []
855
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
856
        else:
857
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
858
                      " %s failed" % (instance, snode))
859

    
860
    feedback_fn("* Verifying orphan volumes")
861
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
862
                                       feedback_fn)
863
    bad = bad or result
864

    
865
    feedback_fn("* Verifying remaining instances")
866
    result = self._VerifyOrphanInstances(instancelist, node_instance,
867
                                         feedback_fn)
868
    bad = bad or result
869

    
870
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
871
      feedback_fn("* Verifying N+1 Memory redundancy")
872
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
873
      bad = bad or result
874

    
875
    feedback_fn("* Other Notes")
876
    if i_non_redundant:
877
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
878
                  % len(i_non_redundant))
879

    
880
    return not bad
881

    
882
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
883
    """Analize the post-hooks' result, handle it, and send some
884
    nicely-formatted feedback back to the user.
885

886
    Args:
887
      phase: the hooks phase that has just been run
888
      hooks_results: the results of the multi-node hooks rpc call
889
      feedback_fn: function to send feedback back to the caller
890
      lu_result: previous Exec result
891

892
    """
893
    # We only really run POST phase hooks, and are only interested in
894
    # their results
895
    if phase == constants.HOOKS_PHASE_POST:
896
      # Used to change hooks' output to proper indentation
897
      indent_re = re.compile('^', re.M)
898
      feedback_fn("* Hooks Results")
899
      if not hooks_results:
900
        feedback_fn("  - ERROR: general communication failure")
901
        lu_result = 1
902
      else:
903
        for node_name in hooks_results:
904
          show_node_header = True
905
          res = hooks_results[node_name]
906
          if res is False or not isinstance(res, list):
907
            feedback_fn("    Communication failure")
908
            lu_result = 1
909
            continue
910
          for script, hkr, output in res:
911
            if hkr == constants.HKR_FAIL:
912
              # The node header is only shown once, if there are
913
              # failing hooks on that node
914
              if show_node_header:
915
                feedback_fn("  Node %s:" % node_name)
916
                show_node_header = False
917
              feedback_fn("    ERROR: Script %s failed, output:" % script)
918
              output = indent_re.sub('      ', output)
919
              feedback_fn("%s" % output)
920
              lu_result = 1
921

    
922
      return lu_result
923

    
924

    
925
class LUVerifyDisks(NoHooksLU):
926
  """Verifies the cluster disks status.
927

928
  """
929
  _OP_REQP = []
930
  REQ_BGL = False
931

    
932
  def ExpandNames(self):
933
    self.needed_locks = {
934
      locking.LEVEL_NODE: locking.ALL_SET,
935
      locking.LEVEL_INSTANCE: locking.ALL_SET,
936
    }
937
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
938

    
939
  def CheckPrereq(self):
940
    """Check prerequisites.
941

942
    This has no prerequisites.
943

944
    """
945
    pass
946

    
947
  def Exec(self, feedback_fn):
948
    """Verify integrity of cluster disks.
949

950
    """
951
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
952

    
953
    vg_name = self.cfg.GetVGName()
954
    nodes = utils.NiceSort(self.cfg.GetNodeList())
955
    instances = [self.cfg.GetInstanceInfo(name)
956
                 for name in self.cfg.GetInstanceList()]
957

    
958
    nv_dict = {}
959
    for inst in instances:
960
      inst_lvs = {}
961
      if (inst.status != "up" or
962
          inst.disk_template not in constants.DTS_NET_MIRROR):
963
        continue
964
      inst.MapLVsByNode(inst_lvs)
965
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
966
      for node, vol_list in inst_lvs.iteritems():
967
        for vol in vol_list:
968
          nv_dict[(node, vol)] = inst
969

    
970
    if not nv_dict:
971
      return result
972

    
973
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
974

    
975
    to_act = set()
976
    for node in nodes:
977
      # node_volume
978
      lvs = node_lvs[node]
979

    
980
      if isinstance(lvs, basestring):
981
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
982
        res_nlvm[node] = lvs
983
      elif not isinstance(lvs, dict):
984
        logger.Info("connection to node %s failed or invalid data returned" %
985
                    (node,))
986
        res_nodes.append(node)
987
        continue
988

    
989
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
990
        inst = nv_dict.pop((node, lv_name), None)
991
        if (not lv_online and inst is not None
992
            and inst.name not in res_instances):
993
          res_instances.append(inst.name)
994

    
995
    # any leftover items in nv_dict are missing LVs, let's arrange the
996
    # data better
997
    for key, inst in nv_dict.iteritems():
998
      if inst.name not in res_missing:
999
        res_missing[inst.name] = []
1000
      res_missing[inst.name].append(key)
1001

    
1002
    return result
1003

    
1004

    
1005
class LURenameCluster(LogicalUnit):
1006
  """Rename the cluster.
1007

1008
  """
1009
  HPATH = "cluster-rename"
1010
  HTYPE = constants.HTYPE_CLUSTER
1011
  _OP_REQP = ["name"]
1012

    
1013
  def BuildHooksEnv(self):
1014
    """Build hooks env.
1015

1016
    """
1017
    env = {
1018
      "OP_TARGET": self.cfg.GetClusterName(),
1019
      "NEW_NAME": self.op.name,
1020
      }
1021
    mn = self.cfg.GetMasterNode()
1022
    return env, [mn], [mn]
1023

    
1024
  def CheckPrereq(self):
1025
    """Verify that the passed name is a valid one.
1026

1027
    """
1028
    hostname = utils.HostInfo(self.op.name)
1029

    
1030
    new_name = hostname.name
1031
    self.ip = new_ip = hostname.ip
1032
    old_name = self.cfg.GetClusterName()
1033
    old_ip = self.cfg.GetMasterIP()
1034
    if new_name == old_name and new_ip == old_ip:
1035
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1036
                                 " cluster has changed")
1037
    if new_ip != old_ip:
1038
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1039
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1040
                                   " reachable on the network. Aborting." %
1041
                                   new_ip)
1042

    
1043
    self.op.name = new_name
1044

    
1045
  def Exec(self, feedback_fn):
1046
    """Rename the cluster.
1047

1048
    """
1049
    clustername = self.op.name
1050
    ip = self.ip
1051

    
1052
    # shutdown the master IP
1053
    master = self.cfg.GetMasterNode()
1054
    if not self.rpc.call_node_stop_master(master, False):
1055
      raise errors.OpExecError("Could not disable the master role")
1056

    
1057
    try:
1058
      # modify the sstore
1059
      # TODO: sstore
1060
      ss.SetKey(ss.SS_MASTER_IP, ip)
1061
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1062

    
1063
      # Distribute updated ss config to all nodes
1064
      myself = self.cfg.GetNodeInfo(master)
1065
      dist_nodes = self.cfg.GetNodeList()
1066
      if myself.name in dist_nodes:
1067
        dist_nodes.remove(myself.name)
1068

    
1069
      logger.Debug("Copying updated ssconf data to all nodes")
1070
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1071
        fname = ss.KeyToFilename(keyname)
1072
        result = self.rpc.call_upload_file(dist_nodes, fname)
1073
        for to_node in dist_nodes:
1074
          if not result[to_node]:
1075
            logger.Error("copy of file %s to node %s failed" %
1076
                         (fname, to_node))
1077
    finally:
1078
      if not self.rpc.call_node_start_master(master, False):
1079
        logger.Error("Could not re-enable the master role on the master,"
1080
                     " please restart manually.")
1081

    
1082

    
1083
def _RecursiveCheckIfLVMBased(disk):
1084
  """Check if the given disk or its children are lvm-based.
1085

1086
  Args:
1087
    disk: ganeti.objects.Disk object
1088

1089
  Returns:
1090
    boolean indicating whether a LD_LV dev_type was found or not
1091

1092
  """
1093
  if disk.children:
1094
    for chdisk in disk.children:
1095
      if _RecursiveCheckIfLVMBased(chdisk):
1096
        return True
1097
  return disk.dev_type == constants.LD_LV
1098

    
1099

    
1100
class LUSetClusterParams(LogicalUnit):
1101
  """Change the parameters of the cluster.
1102

1103
  """
1104
  HPATH = "cluster-modify"
1105
  HTYPE = constants.HTYPE_CLUSTER
1106
  _OP_REQP = []
1107
  REQ_BGL = False
1108

    
1109
  def ExpandNames(self):
1110
    # FIXME: in the future maybe other cluster params won't require checking on
1111
    # all nodes to be modified.
1112
    self.needed_locks = {
1113
      locking.LEVEL_NODE: locking.ALL_SET,
1114
    }
1115
    self.share_locks[locking.LEVEL_NODE] = 1
1116

    
1117
  def BuildHooksEnv(self):
1118
    """Build hooks env.
1119

1120
    """
1121
    env = {
1122
      "OP_TARGET": self.cfg.GetClusterName(),
1123
      "NEW_VG_NAME": self.op.vg_name,
1124
      }
1125
    mn = self.cfg.GetMasterNode()
1126
    return env, [mn], [mn]
1127

    
1128
  def CheckPrereq(self):
1129
    """Check prerequisites.
1130

1131
    This checks whether the given params don't conflict and
1132
    if the given volume group is valid.
1133

1134
    """
1135
    # FIXME: This only works because there is only one parameter that can be
1136
    # changed or removed.
1137
    if not self.op.vg_name:
1138
      instances = self.cfg.GetAllInstancesInfo().values()
1139
      for inst in instances:
1140
        for disk in inst.disks:
1141
          if _RecursiveCheckIfLVMBased(disk):
1142
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1143
                                       " lvm-based instances exist")
1144

    
1145
    # if vg_name not None, checks given volume group on all nodes
1146
    if self.op.vg_name:
1147
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1148
      vglist = self.rpc.call_vg_list(node_list)
1149
      for node in node_list:
1150
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1151
                                              constants.MIN_VG_SIZE)
1152
        if vgstatus:
1153
          raise errors.OpPrereqError("Error on node '%s': %s" %
1154
                                     (node, vgstatus))
1155

    
1156
  def Exec(self, feedback_fn):
1157
    """Change the parameters of the cluster.
1158

1159
    """
1160
    if self.op.vg_name != self.cfg.GetVGName():
1161
      self.cfg.SetVGName(self.op.vg_name)
1162
    else:
1163
      feedback_fn("Cluster LVM configuration already in desired"
1164
                  " state, not changing")
1165

    
1166

    
1167
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1168
  """Sleep and poll for an instance's disk to sync.
1169

1170
  """
1171
  if not instance.disks:
1172
    return True
1173

    
1174
  if not oneshot:
1175
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1176

    
1177
  node = instance.primary_node
1178

    
1179
  for dev in instance.disks:
1180
    lu.cfg.SetDiskID(dev, node)
1181

    
1182
  retries = 0
1183
  while True:
1184
    max_time = 0
1185
    done = True
1186
    cumul_degraded = False
1187
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1188
    if not rstats:
1189
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1190
      retries += 1
1191
      if retries >= 10:
1192
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1193
                                 " aborting." % node)
1194
      time.sleep(6)
1195
      continue
1196
    retries = 0
1197
    for i in range(len(rstats)):
1198
      mstat = rstats[i]
1199
      if mstat is None:
1200
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1201
                           (node, instance.disks[i].iv_name))
1202
        continue
1203
      # we ignore the ldisk parameter
1204
      perc_done, est_time, is_degraded, _ = mstat
1205
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1206
      if perc_done is not None:
1207
        done = False
1208
        if est_time is not None:
1209
          rem_time = "%d estimated seconds remaining" % est_time
1210
          max_time = est_time
1211
        else:
1212
          rem_time = "no time estimate"
1213
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1214
                        (instance.disks[i].iv_name, perc_done, rem_time))
1215
    if done or oneshot:
1216
      break
1217

    
1218
    time.sleep(min(60, max_time))
1219

    
1220
  if done:
1221
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1222
  return not cumul_degraded
1223

    
1224

    
1225
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1226
  """Check that mirrors are not degraded.
1227

1228
  The ldisk parameter, if True, will change the test from the
1229
  is_degraded attribute (which represents overall non-ok status for
1230
  the device(s)) to the ldisk (representing the local storage status).
1231

1232
  """
1233
  lu.cfg.SetDiskID(dev, node)
1234
  if ldisk:
1235
    idx = 6
1236
  else:
1237
    idx = 5
1238

    
1239
  result = True
1240
  if on_primary or dev.AssembleOnSecondary():
1241
    rstats = lu.rpc.call_blockdev_find(node, dev)
1242
    if not rstats:
1243
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1244
      result = False
1245
    else:
1246
      result = result and (not rstats[idx])
1247
  if dev.children:
1248
    for child in dev.children:
1249
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1250

    
1251
  return result
1252

    
1253

    
1254
class LUDiagnoseOS(NoHooksLU):
1255
  """Logical unit for OS diagnose/query.
1256

1257
  """
1258
  _OP_REQP = ["output_fields", "names"]
1259
  REQ_BGL = False
1260

    
1261
  def ExpandNames(self):
1262
    if self.op.names:
1263
      raise errors.OpPrereqError("Selective OS query not supported")
1264

    
1265
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1266
    _CheckOutputFields(static=[],
1267
                       dynamic=self.dynamic_fields,
1268
                       selected=self.op.output_fields)
1269

    
1270
    # Lock all nodes, in shared mode
1271
    self.needed_locks = {}
1272
    self.share_locks[locking.LEVEL_NODE] = 1
1273
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1274

    
1275
  def CheckPrereq(self):
1276
    """Check prerequisites.
1277

1278
    """
1279

    
1280
  @staticmethod
1281
  def _DiagnoseByOS(node_list, rlist):
1282
    """Remaps a per-node return list into an a per-os per-node dictionary
1283

1284
      Args:
1285
        node_list: a list with the names of all nodes
1286
        rlist: a map with node names as keys and OS objects as values
1287

1288
      Returns:
1289
        map: a map with osnames as keys and as value another map, with
1290
             nodes as
1291
             keys and list of OS objects as values
1292
             e.g. {"debian-etch": {"node1": [<object>,...],
1293
                                   "node2": [<object>,]}
1294
                  }
1295

1296
    """
1297
    all_os = {}
1298
    for node_name, nr in rlist.iteritems():
1299
      if not nr:
1300
        continue
1301
      for os_obj in nr:
1302
        if os_obj.name not in all_os:
1303
          # build a list of nodes for this os containing empty lists
1304
          # for each node in node_list
1305
          all_os[os_obj.name] = {}
1306
          for nname in node_list:
1307
            all_os[os_obj.name][nname] = []
1308
        all_os[os_obj.name][node_name].append(os_obj)
1309
    return all_os
1310

    
1311
  def Exec(self, feedback_fn):
1312
    """Compute the list of OSes.
1313

1314
    """
1315
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1316
    node_data = self.rpc.call_os_diagnose(node_list)
1317
    if node_data == False:
1318
      raise errors.OpExecError("Can't gather the list of OSes")
1319
    pol = self._DiagnoseByOS(node_list, node_data)
1320
    output = []
1321
    for os_name, os_data in pol.iteritems():
1322
      row = []
1323
      for field in self.op.output_fields:
1324
        if field == "name":
1325
          val = os_name
1326
        elif field == "valid":
1327
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1328
        elif field == "node_status":
1329
          val = {}
1330
          for node_name, nos_list in os_data.iteritems():
1331
            val[node_name] = [(v.status, v.path) for v in nos_list]
1332
        else:
1333
          raise errors.ParameterError(field)
1334
        row.append(val)
1335
      output.append(row)
1336

    
1337
    return output
1338

    
1339

    
1340
class LURemoveNode(LogicalUnit):
1341
  """Logical unit for removing a node.
1342

1343
  """
1344
  HPATH = "node-remove"
1345
  HTYPE = constants.HTYPE_NODE
1346
  _OP_REQP = ["node_name"]
1347

    
1348
  def BuildHooksEnv(self):
1349
    """Build hooks env.
1350

1351
    This doesn't run on the target node in the pre phase as a failed
1352
    node would then be impossible to remove.
1353

1354
    """
1355
    env = {
1356
      "OP_TARGET": self.op.node_name,
1357
      "NODE_NAME": self.op.node_name,
1358
      }
1359
    all_nodes = self.cfg.GetNodeList()
1360
    all_nodes.remove(self.op.node_name)
1361
    return env, all_nodes, all_nodes
1362

    
1363
  def CheckPrereq(self):
1364
    """Check prerequisites.
1365

1366
    This checks:
1367
     - the node exists in the configuration
1368
     - it does not have primary or secondary instances
1369
     - it's not the master
1370

1371
    Any errors are signalled by raising errors.OpPrereqError.
1372

1373
    """
1374
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1375
    if node is None:
1376
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1377

    
1378
    instance_list = self.cfg.GetInstanceList()
1379

    
1380
    masternode = self.cfg.GetMasterNode()
1381
    if node.name == masternode:
1382
      raise errors.OpPrereqError("Node is the master node,"
1383
                                 " you need to failover first.")
1384

    
1385
    for instance_name in instance_list:
1386
      instance = self.cfg.GetInstanceInfo(instance_name)
1387
      if node.name == instance.primary_node:
1388
        raise errors.OpPrereqError("Instance %s still running on the node,"
1389
                                   " please remove first." % instance_name)
1390
      if node.name in instance.secondary_nodes:
1391
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1392
                                   " please remove first." % instance_name)
1393
    self.op.node_name = node.name
1394
    self.node = node
1395

    
1396
  def Exec(self, feedback_fn):
1397
    """Removes the node from the cluster.
1398

1399
    """
1400
    node = self.node
1401
    logger.Info("stopping the node daemon and removing configs from node %s" %
1402
                node.name)
1403

    
1404
    self.context.RemoveNode(node.name)
1405

    
1406
    self.rpc.call_node_leave_cluster(node.name)
1407

    
1408

    
1409
class LUQueryNodes(NoHooksLU):
1410
  """Logical unit for querying nodes.
1411

1412
  """
1413
  _OP_REQP = ["output_fields", "names"]
1414
  REQ_BGL = False
1415

    
1416
  def ExpandNames(self):
1417
    self.dynamic_fields = frozenset([
1418
      "dtotal", "dfree",
1419
      "mtotal", "mnode", "mfree",
1420
      "bootid",
1421
      "ctotal",
1422
      ])
1423

    
1424
    self.static_fields = frozenset([
1425
      "name", "pinst_cnt", "sinst_cnt",
1426
      "pinst_list", "sinst_list",
1427
      "pip", "sip", "tags",
1428
      "serial_no",
1429
      ])
1430

    
1431
    _CheckOutputFields(static=self.static_fields,
1432
                       dynamic=self.dynamic_fields,
1433
                       selected=self.op.output_fields)
1434

    
1435
    self.needed_locks = {}
1436
    self.share_locks[locking.LEVEL_NODE] = 1
1437

    
1438
    if self.op.names:
1439
      self.wanted = _GetWantedNodes(self, self.op.names)
1440
    else:
1441
      self.wanted = locking.ALL_SET
1442

    
1443
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1444
    if self.do_locking:
1445
      # if we don't request only static fields, we need to lock the nodes
1446
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1447

    
1448

    
1449
  def CheckPrereq(self):
1450
    """Check prerequisites.
1451

1452
    """
1453
    # The validation of the node list is done in the _GetWantedNodes,
1454
    # if non empty, and if empty, there's no validation to do
1455
    pass
1456

    
1457
  def Exec(self, feedback_fn):
1458
    """Computes the list of nodes and their attributes.
1459

1460
    """
1461
    all_info = self.cfg.GetAllNodesInfo()
1462
    if self.do_locking:
1463
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1464
    elif self.wanted != locking.ALL_SET:
1465
      nodenames = self.wanted
1466
      missing = set(nodenames).difference(all_info.keys())
1467
      if missing:
1468
        raise errors.OpExecError(
1469
          "Some nodes were removed before retrieving their data: %s" % missing)
1470
    else:
1471
      nodenames = all_info.keys()
1472
    nodelist = [all_info[name] for name in nodenames]
1473

    
1474
    # begin data gathering
1475

    
1476
    if self.dynamic_fields.intersection(self.op.output_fields):
1477
      live_data = {}
1478
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1479
                                          self.cfg.GetHypervisorType())
1480
      for name in nodenames:
1481
        nodeinfo = node_data.get(name, None)
1482
        if nodeinfo:
1483
          live_data[name] = {
1484
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1485
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1486
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1487
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1488
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1489
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1490
            "bootid": nodeinfo['bootid'],
1491
            }
1492
        else:
1493
          live_data[name] = {}
1494
    else:
1495
      live_data = dict.fromkeys(nodenames, {})
1496

    
1497
    node_to_primary = dict([(name, set()) for name in nodenames])
1498
    node_to_secondary = dict([(name, set()) for name in nodenames])
1499

    
1500
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1501
                             "sinst_cnt", "sinst_list"))
1502
    if inst_fields & frozenset(self.op.output_fields):
1503
      instancelist = self.cfg.GetInstanceList()
1504

    
1505
      for instance_name in instancelist:
1506
        inst = self.cfg.GetInstanceInfo(instance_name)
1507
        if inst.primary_node in node_to_primary:
1508
          node_to_primary[inst.primary_node].add(inst.name)
1509
        for secnode in inst.secondary_nodes:
1510
          if secnode in node_to_secondary:
1511
            node_to_secondary[secnode].add(inst.name)
1512

    
1513
    # end data gathering
1514

    
1515
    output = []
1516
    for node in nodelist:
1517
      node_output = []
1518
      for field in self.op.output_fields:
1519
        if field == "name":
1520
          val = node.name
1521
        elif field == "pinst_list":
1522
          val = list(node_to_primary[node.name])
1523
        elif field == "sinst_list":
1524
          val = list(node_to_secondary[node.name])
1525
        elif field == "pinst_cnt":
1526
          val = len(node_to_primary[node.name])
1527
        elif field == "sinst_cnt":
1528
          val = len(node_to_secondary[node.name])
1529
        elif field == "pip":
1530
          val = node.primary_ip
1531
        elif field == "sip":
1532
          val = node.secondary_ip
1533
        elif field == "tags":
1534
          val = list(node.GetTags())
1535
        elif field == "serial_no":
1536
          val = node.serial_no
1537
        elif field in self.dynamic_fields:
1538
          val = live_data[node.name].get(field, None)
1539
        else:
1540
          raise errors.ParameterError(field)
1541
        node_output.append(val)
1542
      output.append(node_output)
1543

    
1544
    return output
1545

    
1546

    
1547
class LUQueryNodeVolumes(NoHooksLU):
1548
  """Logical unit for getting volumes on node(s).
1549

1550
  """
1551
  _OP_REQP = ["nodes", "output_fields"]
1552
  REQ_BGL = False
1553

    
1554
  def ExpandNames(self):
1555
    _CheckOutputFields(static=["node"],
1556
                       dynamic=["phys", "vg", "name", "size", "instance"],
1557
                       selected=self.op.output_fields)
1558

    
1559
    self.needed_locks = {}
1560
    self.share_locks[locking.LEVEL_NODE] = 1
1561
    if not self.op.nodes:
1562
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1563
    else:
1564
      self.needed_locks[locking.LEVEL_NODE] = \
1565
        _GetWantedNodes(self, self.op.nodes)
1566

    
1567
  def CheckPrereq(self):
1568
    """Check prerequisites.
1569

1570
    This checks that the fields required are valid output fields.
1571

1572
    """
1573
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1574

    
1575
  def Exec(self, feedback_fn):
1576
    """Computes the list of nodes and their attributes.
1577

1578
    """
1579
    nodenames = self.nodes
1580
    volumes = self.rpc.call_node_volumes(nodenames)
1581

    
1582
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1583
             in self.cfg.GetInstanceList()]
1584

    
1585
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1586

    
1587
    output = []
1588
    for node in nodenames:
1589
      if node not in volumes or not volumes[node]:
1590
        continue
1591

    
1592
      node_vols = volumes[node][:]
1593
      node_vols.sort(key=lambda vol: vol['dev'])
1594

    
1595
      for vol in node_vols:
1596
        node_output = []
1597
        for field in self.op.output_fields:
1598
          if field == "node":
1599
            val = node
1600
          elif field == "phys":
1601
            val = vol['dev']
1602
          elif field == "vg":
1603
            val = vol['vg']
1604
          elif field == "name":
1605
            val = vol['name']
1606
          elif field == "size":
1607
            val = int(float(vol['size']))
1608
          elif field == "instance":
1609
            for inst in ilist:
1610
              if node not in lv_by_node[inst]:
1611
                continue
1612
              if vol['name'] in lv_by_node[inst][node]:
1613
                val = inst.name
1614
                break
1615
            else:
1616
              val = '-'
1617
          else:
1618
            raise errors.ParameterError(field)
1619
          node_output.append(str(val))
1620

    
1621
        output.append(node_output)
1622

    
1623
    return output
1624

    
1625

    
1626
class LUAddNode(LogicalUnit):
1627
  """Logical unit for adding node to the cluster.
1628

1629
  """
1630
  HPATH = "node-add"
1631
  HTYPE = constants.HTYPE_NODE
1632
  _OP_REQP = ["node_name"]
1633

    
1634
  def BuildHooksEnv(self):
1635
    """Build hooks env.
1636

1637
    This will run on all nodes before, and on all nodes + the new node after.
1638

1639
    """
1640
    env = {
1641
      "OP_TARGET": self.op.node_name,
1642
      "NODE_NAME": self.op.node_name,
1643
      "NODE_PIP": self.op.primary_ip,
1644
      "NODE_SIP": self.op.secondary_ip,
1645
      }
1646
    nodes_0 = self.cfg.GetNodeList()
1647
    nodes_1 = nodes_0 + [self.op.node_name, ]
1648
    return env, nodes_0, nodes_1
1649

    
1650
  def CheckPrereq(self):
1651
    """Check prerequisites.
1652

1653
    This checks:
1654
     - the new node is not already in the config
1655
     - it is resolvable
1656
     - its parameters (single/dual homed) matches the cluster
1657

1658
    Any errors are signalled by raising errors.OpPrereqError.
1659

1660
    """
1661
    node_name = self.op.node_name
1662
    cfg = self.cfg
1663

    
1664
    dns_data = utils.HostInfo(node_name)
1665

    
1666
    node = dns_data.name
1667
    primary_ip = self.op.primary_ip = dns_data.ip
1668
    secondary_ip = getattr(self.op, "secondary_ip", None)
1669
    if secondary_ip is None:
1670
      secondary_ip = primary_ip
1671
    if not utils.IsValidIP(secondary_ip):
1672
      raise errors.OpPrereqError("Invalid secondary IP given")
1673
    self.op.secondary_ip = secondary_ip
1674

    
1675
    node_list = cfg.GetNodeList()
1676
    if not self.op.readd and node in node_list:
1677
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1678
                                 node)
1679
    elif self.op.readd and node not in node_list:
1680
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1681

    
1682
    for existing_node_name in node_list:
1683
      existing_node = cfg.GetNodeInfo(existing_node_name)
1684

    
1685
      if self.op.readd and node == existing_node_name:
1686
        if (existing_node.primary_ip != primary_ip or
1687
            existing_node.secondary_ip != secondary_ip):
1688
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1689
                                     " address configuration as before")
1690
        continue
1691

    
1692
      if (existing_node.primary_ip == primary_ip or
1693
          existing_node.secondary_ip == primary_ip or
1694
          existing_node.primary_ip == secondary_ip or
1695
          existing_node.secondary_ip == secondary_ip):
1696
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1697
                                   " existing node %s" % existing_node.name)
1698

    
1699
    # check that the type of the node (single versus dual homed) is the
1700
    # same as for the master
1701
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1702
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1703
    newbie_singlehomed = secondary_ip == primary_ip
1704
    if master_singlehomed != newbie_singlehomed:
1705
      if master_singlehomed:
1706
        raise errors.OpPrereqError("The master has no private ip but the"
1707
                                   " new node has one")
1708
      else:
1709
        raise errors.OpPrereqError("The master has a private ip but the"
1710
                                   " new node doesn't have one")
1711

    
1712
    # checks reachablity
1713
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1714
      raise errors.OpPrereqError("Node not reachable by ping")
1715

    
1716
    if not newbie_singlehomed:
1717
      # check reachability from my secondary ip to newbie's secondary ip
1718
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1719
                           source=myself.secondary_ip):
1720
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1721
                                   " based ping to noded port")
1722

    
1723
    self.new_node = objects.Node(name=node,
1724
                                 primary_ip=primary_ip,
1725
                                 secondary_ip=secondary_ip)
1726

    
1727
  def Exec(self, feedback_fn):
1728
    """Adds the new node to the cluster.
1729

1730
    """
1731
    new_node = self.new_node
1732
    node = new_node.name
1733

    
1734
    # check connectivity
1735
    result = self.rpc.call_version([node])[node]
1736
    if result:
1737
      if constants.PROTOCOL_VERSION == result:
1738
        logger.Info("communication to node %s fine, sw version %s match" %
1739
                    (node, result))
1740
      else:
1741
        raise errors.OpExecError("Version mismatch master version %s,"
1742
                                 " node version %s" %
1743
                                 (constants.PROTOCOL_VERSION, result))
1744
    else:
1745
      raise errors.OpExecError("Cannot get version from the new node")
1746

    
1747
    # setup ssh on node
1748
    logger.Info("copy ssh key to node %s" % node)
1749
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1750
    keyarray = []
1751
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1752
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1753
                priv_key, pub_key]
1754

    
1755
    for i in keyfiles:
1756
      f = open(i, 'r')
1757
      try:
1758
        keyarray.append(f.read())
1759
      finally:
1760
        f.close()
1761

    
1762
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1763
                                    keyarray[2],
1764
                                    keyarray[3], keyarray[4], keyarray[5])
1765

    
1766
    if not result:
1767
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1768

    
1769
    # Add node to our /etc/hosts, and add key to known_hosts
1770
    utils.AddHostToEtcHosts(new_node.name)
1771

    
1772
    if new_node.secondary_ip != new_node.primary_ip:
1773
      if not self.rpc.call_node_tcp_ping(new_node.name,
1774
                                         constants.LOCALHOST_IP_ADDRESS,
1775
                                         new_node.secondary_ip,
1776
                                         constants.DEFAULT_NODED_PORT,
1777
                                         10, False):
1778
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1779
                                 " you gave (%s). Please fix and re-run this"
1780
                                 " command." % new_node.secondary_ip)
1781

    
1782
    node_verify_list = [self.cfg.GetMasterNode()]
1783
    node_verify_param = {
1784
      'nodelist': [node],
1785
      # TODO: do a node-net-test as well?
1786
    }
1787

    
1788
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1789
                                       self.cfg.GetClusterName())
1790
    for verifier in node_verify_list:
1791
      if not result[verifier]:
1792
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1793
                                 " for remote verification" % verifier)
1794
      if result[verifier]['nodelist']:
1795
        for failed in result[verifier]['nodelist']:
1796
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1797
                      (verifier, result[verifier]['nodelist'][failed]))
1798
        raise errors.OpExecError("ssh/hostname verification failed.")
1799

    
1800
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1801
    # including the node just added
1802
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1803
    dist_nodes = self.cfg.GetNodeList()
1804
    if not self.op.readd:
1805
      dist_nodes.append(node)
1806
    if myself.name in dist_nodes:
1807
      dist_nodes.remove(myself.name)
1808

    
1809
    logger.Debug("Copying hosts and known_hosts to all nodes")
1810
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1811
      result = self.rpc.call_upload_file(dist_nodes, fname)
1812
      for to_node in dist_nodes:
1813
        if not result[to_node]:
1814
          logger.Error("copy of file %s to node %s failed" %
1815
                       (fname, to_node))
1816

    
1817
    to_copy = []
1818
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1819
      to_copy.append(constants.VNC_PASSWORD_FILE)
1820
    for fname in to_copy:
1821
      result = self.rpc.call_upload_file([node], fname)
1822
      if not result[node]:
1823
        logger.Error("could not copy file %s to node %s" % (fname, node))
1824

    
1825
    if self.op.readd:
1826
      self.context.ReaddNode(new_node)
1827
    else:
1828
      self.context.AddNode(new_node)
1829

    
1830

    
1831
class LUQueryClusterInfo(NoHooksLU):
1832
  """Query cluster configuration.
1833

1834
  """
1835
  _OP_REQP = []
1836
  REQ_MASTER = False
1837
  REQ_BGL = False
1838

    
1839
  def ExpandNames(self):
1840
    self.needed_locks = {}
1841

    
1842
  def CheckPrereq(self):
1843
    """No prerequsites needed for this LU.
1844

1845
    """
1846
    pass
1847

    
1848
  def Exec(self, feedback_fn):
1849
    """Return cluster config.
1850

1851
    """
1852
    result = {
1853
      "name": self.cfg.GetClusterName(),
1854
      "software_version": constants.RELEASE_VERSION,
1855
      "protocol_version": constants.PROTOCOL_VERSION,
1856
      "config_version": constants.CONFIG_VERSION,
1857
      "os_api_version": constants.OS_API_VERSION,
1858
      "export_version": constants.EXPORT_VERSION,
1859
      "master": self.cfg.GetMasterNode(),
1860
      "architecture": (platform.architecture()[0], platform.machine()),
1861
      "hypervisor_type": self.cfg.GetHypervisorType(),
1862
      "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1863
      }
1864

    
1865
    return result
1866

    
1867

    
1868
class LUQueryConfigValues(NoHooksLU):
1869
  """Return configuration values.
1870

1871
  """
1872
  _OP_REQP = []
1873
  REQ_BGL = False
1874

    
1875
  def ExpandNames(self):
1876
    self.needed_locks = {}
1877

    
1878
    static_fields = ["cluster_name", "master_node"]
1879
    _CheckOutputFields(static=static_fields,
1880
                       dynamic=[],
1881
                       selected=self.op.output_fields)
1882

    
1883
  def CheckPrereq(self):
1884
    """No prerequisites.
1885

1886
    """
1887
    pass
1888

    
1889
  def Exec(self, feedback_fn):
1890
    """Dump a representation of the cluster config to the standard output.
1891

1892
    """
1893
    values = []
1894
    for field in self.op.output_fields:
1895
      if field == "cluster_name":
1896
        values.append(self.cfg.GetClusterName())
1897
      elif field == "master_node":
1898
        values.append(self.cfg.GetMasterNode())
1899
      else:
1900
        raise errors.ParameterError(field)
1901
    return values
1902

    
1903

    
1904
class LUActivateInstanceDisks(NoHooksLU):
1905
  """Bring up an instance's disks.
1906

1907
  """
1908
  _OP_REQP = ["instance_name"]
1909
  REQ_BGL = False
1910

    
1911
  def ExpandNames(self):
1912
    self._ExpandAndLockInstance()
1913
    self.needed_locks[locking.LEVEL_NODE] = []
1914
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1915

    
1916
  def DeclareLocks(self, level):
1917
    if level == locking.LEVEL_NODE:
1918
      self._LockInstancesNodes()
1919

    
1920
  def CheckPrereq(self):
1921
    """Check prerequisites.
1922

1923
    This checks that the instance is in the cluster.
1924

1925
    """
1926
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1927
    assert self.instance is not None, \
1928
      "Cannot retrieve locked instance %s" % self.op.instance_name
1929

    
1930
  def Exec(self, feedback_fn):
1931
    """Activate the disks.
1932

1933
    """
1934
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1935
    if not disks_ok:
1936
      raise errors.OpExecError("Cannot activate block devices")
1937

    
1938
    return disks_info
1939

    
1940

    
1941
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1942
  """Prepare the block devices for an instance.
1943

1944
  This sets up the block devices on all nodes.
1945

1946
  Args:
1947
    instance: a ganeti.objects.Instance object
1948
    ignore_secondaries: if true, errors on secondary nodes won't result
1949
                        in an error return from the function
1950

1951
  Returns:
1952
    false if the operation failed
1953
    list of (host, instance_visible_name, node_visible_name) if the operation
1954
         suceeded with the mapping from node devices to instance devices
1955
  """
1956
  device_info = []
1957
  disks_ok = True
1958
  iname = instance.name
1959
  # With the two passes mechanism we try to reduce the window of
1960
  # opportunity for the race condition of switching DRBD to primary
1961
  # before handshaking occured, but we do not eliminate it
1962

    
1963
  # The proper fix would be to wait (with some limits) until the
1964
  # connection has been made and drbd transitions from WFConnection
1965
  # into any other network-connected state (Connected, SyncTarget,
1966
  # SyncSource, etc.)
1967

    
1968
  # 1st pass, assemble on all nodes in secondary mode
1969
  for inst_disk in instance.disks:
1970
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1971
      lu.cfg.SetDiskID(node_disk, node)
1972
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1973
      if not result:
1974
        logger.Error("could not prepare block device %s on node %s"
1975
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1976
        if not ignore_secondaries:
1977
          disks_ok = False
1978

    
1979
  # FIXME: race condition on drbd migration to primary
1980

    
1981
  # 2nd pass, do only the primary node
1982
  for inst_disk in instance.disks:
1983
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1984
      if node != instance.primary_node:
1985
        continue
1986
      lu.cfg.SetDiskID(node_disk, node)
1987
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1988
      if not result:
1989
        logger.Error("could not prepare block device %s on node %s"
1990
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1991
        disks_ok = False
1992
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1993

    
1994
  # leave the disks configured for the primary node
1995
  # this is a workaround that would be fixed better by
1996
  # improving the logical/physical id handling
1997
  for disk in instance.disks:
1998
    lu.cfg.SetDiskID(disk, instance.primary_node)
1999

    
2000
  return disks_ok, device_info
2001

    
2002

    
2003
def _StartInstanceDisks(lu, instance, force):
2004
  """Start the disks of an instance.
2005

2006
  """
2007
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2008
                                           ignore_secondaries=force)
2009
  if not disks_ok:
2010
    _ShutdownInstanceDisks(lu, instance)
2011
    if force is not None and not force:
2012
      logger.Error("If the message above refers to a secondary node,"
2013
                   " you can retry the operation using '--force'.")
2014
    raise errors.OpExecError("Disk consistency error")
2015

    
2016

    
2017
class LUDeactivateInstanceDisks(NoHooksLU):
2018
  """Shutdown an instance's disks.
2019

2020
  """
2021
  _OP_REQP = ["instance_name"]
2022
  REQ_BGL = False
2023

    
2024
  def ExpandNames(self):
2025
    self._ExpandAndLockInstance()
2026
    self.needed_locks[locking.LEVEL_NODE] = []
2027
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2028

    
2029
  def DeclareLocks(self, level):
2030
    if level == locking.LEVEL_NODE:
2031
      self._LockInstancesNodes()
2032

    
2033
  def CheckPrereq(self):
2034
    """Check prerequisites.
2035

2036
    This checks that the instance is in the cluster.
2037

2038
    """
2039
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2040
    assert self.instance is not None, \
2041
      "Cannot retrieve locked instance %s" % self.op.instance_name
2042

    
2043
  def Exec(self, feedback_fn):
2044
    """Deactivate the disks
2045

2046
    """
2047
    instance = self.instance
2048
    _SafeShutdownInstanceDisks(self, instance)
2049

    
2050

    
2051
def _SafeShutdownInstanceDisks(lu, instance):
2052
  """Shutdown block devices of an instance.
2053

2054
  This function checks if an instance is running, before calling
2055
  _ShutdownInstanceDisks.
2056

2057
  """
2058
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2059
                                      [instance.hypervisor])
2060
  ins_l = ins_l[instance.primary_node]
2061
  if not type(ins_l) is list:
2062
    raise errors.OpExecError("Can't contact node '%s'" %
2063
                             instance.primary_node)
2064

    
2065
  if instance.name in ins_l:
2066
    raise errors.OpExecError("Instance is running, can't shutdown"
2067
                             " block devices.")
2068

    
2069
  _ShutdownInstanceDisks(lu, instance)
2070

    
2071

    
2072
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2073
  """Shutdown block devices of an instance.
2074

2075
  This does the shutdown on all nodes of the instance.
2076

2077
  If the ignore_primary is false, errors on the primary node are
2078
  ignored.
2079

2080
  """
2081
  result = True
2082
  for disk in instance.disks:
2083
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2084
      lu.cfg.SetDiskID(top_disk, node)
2085
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2086
        logger.Error("could not shutdown block device %s on node %s" %
2087
                     (disk.iv_name, node))
2088
        if not ignore_primary or node != instance.primary_node:
2089
          result = False
2090
  return result
2091

    
2092

    
2093
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2094
  """Checks if a node has enough free memory.
2095

2096
  This function check if a given node has the needed amount of free
2097
  memory. In case the node has less memory or we cannot get the
2098
  information from the node, this function raise an OpPrereqError
2099
  exception.
2100

2101
  @type lu: C{LogicalUnit}
2102
  @param lu: a logical unit from which we get configuration data
2103
  @type node: C{str}
2104
  @param node: the node to check
2105
  @type reason: C{str}
2106
  @param reason: string to use in the error message
2107
  @type requested: C{int}
2108
  @param requested: the amount of memory in MiB to check for
2109
  @type hypervisor: C{str}
2110
  @param hypervisor: the hypervisor to ask for memory stats
2111
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2112
      we cannot check the node
2113

2114
  """
2115
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2116
  if not nodeinfo or not isinstance(nodeinfo, dict):
2117
    raise errors.OpPrereqError("Could not contact node %s for resource"
2118
                             " information" % (node,))
2119

    
2120
  free_mem = nodeinfo[node].get('memory_free')
2121
  if not isinstance(free_mem, int):
2122
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2123
                             " was '%s'" % (node, free_mem))
2124
  if requested > free_mem:
2125
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2126
                             " needed %s MiB, available %s MiB" %
2127
                             (node, reason, requested, free_mem))
2128

    
2129

    
2130
class LUStartupInstance(LogicalUnit):
2131
  """Starts an instance.
2132

2133
  """
2134
  HPATH = "instance-start"
2135
  HTYPE = constants.HTYPE_INSTANCE
2136
  _OP_REQP = ["instance_name", "force"]
2137
  REQ_BGL = False
2138

    
2139
  def ExpandNames(self):
2140
    self._ExpandAndLockInstance()
2141
    self.needed_locks[locking.LEVEL_NODE] = []
2142
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2143

    
2144
  def DeclareLocks(self, level):
2145
    if level == locking.LEVEL_NODE:
2146
      self._LockInstancesNodes()
2147

    
2148
  def BuildHooksEnv(self):
2149
    """Build hooks env.
2150

2151
    This runs on master, primary and secondary nodes of the instance.
2152

2153
    """
2154
    env = {
2155
      "FORCE": self.op.force,
2156
      }
2157
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2158
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2159
          list(self.instance.secondary_nodes))
2160
    return env, nl, nl
2161

    
2162
  def CheckPrereq(self):
2163
    """Check prerequisites.
2164

2165
    This checks that the instance is in the cluster.
2166

2167
    """
2168
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2169
    assert self.instance is not None, \
2170
      "Cannot retrieve locked instance %s" % self.op.instance_name
2171

    
2172
    # check bridges existance
2173
    _CheckInstanceBridgesExist(self, instance)
2174

    
2175
    _CheckNodeFreeMemory(self, instance.primary_node,
2176
                         "starting instance %s" % instance.name,
2177
                         instance.memory, instance.hypervisor)
2178

    
2179
  def Exec(self, feedback_fn):
2180
    """Start the instance.
2181

2182
    """
2183
    instance = self.instance
2184
    force = self.op.force
2185
    extra_args = getattr(self.op, "extra_args", "")
2186

    
2187
    self.cfg.MarkInstanceUp(instance.name)
2188

    
2189
    node_current = instance.primary_node
2190

    
2191
    _StartInstanceDisks(self, instance, force)
2192

    
2193
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2194
      _ShutdownInstanceDisks(self, instance)
2195
      raise errors.OpExecError("Could not start instance")
2196

    
2197

    
2198
class LURebootInstance(LogicalUnit):
2199
  """Reboot an instance.
2200

2201
  """
2202
  HPATH = "instance-reboot"
2203
  HTYPE = constants.HTYPE_INSTANCE
2204
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2205
  REQ_BGL = False
2206

    
2207
  def ExpandNames(self):
2208
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2209
                                   constants.INSTANCE_REBOOT_HARD,
2210
                                   constants.INSTANCE_REBOOT_FULL]:
2211
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2212
                                  (constants.INSTANCE_REBOOT_SOFT,
2213
                                   constants.INSTANCE_REBOOT_HARD,
2214
                                   constants.INSTANCE_REBOOT_FULL))
2215
    self._ExpandAndLockInstance()
2216
    self.needed_locks[locking.LEVEL_NODE] = []
2217
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2218

    
2219
  def DeclareLocks(self, level):
2220
    if level == locking.LEVEL_NODE:
2221
      primary_only = not constants.INSTANCE_REBOOT_FULL
2222
      self._LockInstancesNodes(primary_only=primary_only)
2223

    
2224
  def BuildHooksEnv(self):
2225
    """Build hooks env.
2226

2227
    This runs on master, primary and secondary nodes of the instance.
2228

2229
    """
2230
    env = {
2231
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2232
      }
2233
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2234
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2235
          list(self.instance.secondary_nodes))
2236
    return env, nl, nl
2237

    
2238
  def CheckPrereq(self):
2239
    """Check prerequisites.
2240

2241
    This checks that the instance is in the cluster.
2242

2243
    """
2244
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2245
    assert self.instance is not None, \
2246
      "Cannot retrieve locked instance %s" % self.op.instance_name
2247

    
2248
    # check bridges existance
2249
    _CheckInstanceBridgesExist(self, instance)
2250

    
2251
  def Exec(self, feedback_fn):
2252
    """Reboot the instance.
2253

2254
    """
2255
    instance = self.instance
2256
    ignore_secondaries = self.op.ignore_secondaries
2257
    reboot_type = self.op.reboot_type
2258
    extra_args = getattr(self.op, "extra_args", "")
2259

    
2260
    node_current = instance.primary_node
2261

    
2262
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2263
                       constants.INSTANCE_REBOOT_HARD]:
2264
      if not self.rpc.call_instance_reboot(node_current, instance,
2265
                                           reboot_type, extra_args):
2266
        raise errors.OpExecError("Could not reboot instance")
2267
    else:
2268
      if not self.rpc.call_instance_shutdown(node_current, instance):
2269
        raise errors.OpExecError("could not shutdown instance for full reboot")
2270
      _ShutdownInstanceDisks(self, instance)
2271
      _StartInstanceDisks(self, instance, ignore_secondaries)
2272
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2273
        _ShutdownInstanceDisks(self, instance)
2274
        raise errors.OpExecError("Could not start instance for full reboot")
2275

    
2276
    self.cfg.MarkInstanceUp(instance.name)
2277

    
2278

    
2279
class LUShutdownInstance(LogicalUnit):
2280
  """Shutdown an instance.
2281

2282
  """
2283
  HPATH = "instance-stop"
2284
  HTYPE = constants.HTYPE_INSTANCE
2285
  _OP_REQP = ["instance_name"]
2286
  REQ_BGL = False
2287

    
2288
  def ExpandNames(self):
2289
    self._ExpandAndLockInstance()
2290
    self.needed_locks[locking.LEVEL_NODE] = []
2291
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2292

    
2293
  def DeclareLocks(self, level):
2294
    if level == locking.LEVEL_NODE:
2295
      self._LockInstancesNodes()
2296

    
2297
  def BuildHooksEnv(self):
2298
    """Build hooks env.
2299

2300
    This runs on master, primary and secondary nodes of the instance.
2301

2302
    """
2303
    env = _BuildInstanceHookEnvByObject(self.instance)
2304
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2305
          list(self.instance.secondary_nodes))
2306
    return env, nl, nl
2307

    
2308
  def CheckPrereq(self):
2309
    """Check prerequisites.
2310

2311
    This checks that the instance is in the cluster.
2312

2313
    """
2314
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2315
    assert self.instance is not None, \
2316
      "Cannot retrieve locked instance %s" % self.op.instance_name
2317

    
2318
  def Exec(self, feedback_fn):
2319
    """Shutdown the instance.
2320

2321
    """
2322
    instance = self.instance
2323
    node_current = instance.primary_node
2324
    self.cfg.MarkInstanceDown(instance.name)
2325
    if not self.rpc.call_instance_shutdown(node_current, instance):
2326
      logger.Error("could not shutdown instance")
2327

    
2328
    _ShutdownInstanceDisks(self, instance)
2329

    
2330

    
2331
class LUReinstallInstance(LogicalUnit):
2332
  """Reinstall an instance.
2333

2334
  """
2335
  HPATH = "instance-reinstall"
2336
  HTYPE = constants.HTYPE_INSTANCE
2337
  _OP_REQP = ["instance_name"]
2338
  REQ_BGL = False
2339

    
2340
  def ExpandNames(self):
2341
    self._ExpandAndLockInstance()
2342
    self.needed_locks[locking.LEVEL_NODE] = []
2343
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2344

    
2345
  def DeclareLocks(self, level):
2346
    if level == locking.LEVEL_NODE:
2347
      self._LockInstancesNodes()
2348

    
2349
  def BuildHooksEnv(self):
2350
    """Build hooks env.
2351

2352
    This runs on master, primary and secondary nodes of the instance.
2353

2354
    """
2355
    env = _BuildInstanceHookEnvByObject(self.instance)
2356
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2357
          list(self.instance.secondary_nodes))
2358
    return env, nl, nl
2359

    
2360
  def CheckPrereq(self):
2361
    """Check prerequisites.
2362

2363
    This checks that the instance is in the cluster and is not running.
2364

2365
    """
2366
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2367
    assert instance is not None, \
2368
      "Cannot retrieve locked instance %s" % self.op.instance_name
2369

    
2370
    if instance.disk_template == constants.DT_DISKLESS:
2371
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2372
                                 self.op.instance_name)
2373
    if instance.status != "down":
2374
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2375
                                 self.op.instance_name)
2376
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2377
                                              instance.name,
2378
                                              instance.hypervisor)
2379
    if remote_info:
2380
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2381
                                 (self.op.instance_name,
2382
                                  instance.primary_node))
2383

    
2384
    self.op.os_type = getattr(self.op, "os_type", None)
2385
    if self.op.os_type is not None:
2386
      # OS verification
2387
      pnode = self.cfg.GetNodeInfo(
2388
        self.cfg.ExpandNodeName(instance.primary_node))
2389
      if pnode is None:
2390
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2391
                                   self.op.pnode)
2392
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2393
      if not os_obj:
2394
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2395
                                   " primary node"  % self.op.os_type)
2396

    
2397
    self.instance = instance
2398

    
2399
  def Exec(self, feedback_fn):
2400
    """Reinstall the instance.
2401

2402
    """
2403
    inst = self.instance
2404

    
2405
    if self.op.os_type is not None:
2406
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2407
      inst.os = self.op.os_type
2408
      self.cfg.Update(inst)
2409

    
2410
    _StartInstanceDisks(self, inst, None)
2411
    try:
2412
      feedback_fn("Running the instance OS create scripts...")
2413
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2414
                                           "sda", "sdb"):
2415
        raise errors.OpExecError("Could not install OS for instance %s"
2416
                                 " on node %s" %
2417
                                 (inst.name, inst.primary_node))
2418
    finally:
2419
      _ShutdownInstanceDisks(self, inst)
2420

    
2421

    
2422
class LURenameInstance(LogicalUnit):
2423
  """Rename an instance.
2424

2425
  """
2426
  HPATH = "instance-rename"
2427
  HTYPE = constants.HTYPE_INSTANCE
2428
  _OP_REQP = ["instance_name", "new_name"]
2429

    
2430
  def BuildHooksEnv(self):
2431
    """Build hooks env.
2432

2433
    This runs on master, primary and secondary nodes of the instance.
2434

2435
    """
2436
    env = _BuildInstanceHookEnvByObject(self.instance)
2437
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2438
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2439
          list(self.instance.secondary_nodes))
2440
    return env, nl, nl
2441

    
2442
  def CheckPrereq(self):
2443
    """Check prerequisites.
2444

2445
    This checks that the instance is in the cluster and is not running.
2446

2447
    """
2448
    instance = self.cfg.GetInstanceInfo(
2449
      self.cfg.ExpandInstanceName(self.op.instance_name))
2450
    if instance is None:
2451
      raise errors.OpPrereqError("Instance '%s' not known" %
2452
                                 self.op.instance_name)
2453
    if instance.status != "down":
2454
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2455
                                 self.op.instance_name)
2456
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2457
                                              instance.name,
2458
                                              instance.hypervisor)
2459
    if remote_info:
2460
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2461
                                 (self.op.instance_name,
2462
                                  instance.primary_node))
2463
    self.instance = instance
2464

    
2465
    # new name verification
2466
    name_info = utils.HostInfo(self.op.new_name)
2467

    
2468
    self.op.new_name = new_name = name_info.name
2469
    instance_list = self.cfg.GetInstanceList()
2470
    if new_name in instance_list:
2471
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2472
                                 new_name)
2473

    
2474
    if not getattr(self.op, "ignore_ip", False):
2475
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2476
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2477
                                   (name_info.ip, new_name))
2478

    
2479

    
2480
  def Exec(self, feedback_fn):
2481
    """Reinstall the instance.
2482

2483
    """
2484
    inst = self.instance
2485
    old_name = inst.name
2486

    
2487
    if inst.disk_template == constants.DT_FILE:
2488
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2489

    
2490
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2491
    # Change the instance lock. This is definitely safe while we hold the BGL
2492
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2493
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2494

    
2495
    # re-read the instance from the configuration after rename
2496
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2497

    
2498
    if inst.disk_template == constants.DT_FILE:
2499
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2500
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2501
                                                     old_file_storage_dir,
2502
                                                     new_file_storage_dir)
2503

    
2504
      if not result:
2505
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2506
                                 " directory '%s' to '%s' (but the instance"
2507
                                 " has been renamed in Ganeti)" % (
2508
                                 inst.primary_node, old_file_storage_dir,
2509
                                 new_file_storage_dir))
2510

    
2511
      if not result[0]:
2512
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2513
                                 " (but the instance has been renamed in"
2514
                                 " Ganeti)" % (old_file_storage_dir,
2515
                                               new_file_storage_dir))
2516

    
2517
    _StartInstanceDisks(self, inst, None)
2518
    try:
2519
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2520
                                               old_name,
2521
                                               "sda", "sdb"):
2522
        msg = ("Could not run OS rename script for instance %s on node %s"
2523
               " (but the instance has been renamed in Ganeti)" %
2524
               (inst.name, inst.primary_node))
2525
        logger.Error(msg)
2526
    finally:
2527
      _ShutdownInstanceDisks(self, inst)
2528

    
2529

    
2530
class LURemoveInstance(LogicalUnit):
2531
  """Remove an instance.
2532

2533
  """
2534
  HPATH = "instance-remove"
2535
  HTYPE = constants.HTYPE_INSTANCE
2536
  _OP_REQP = ["instance_name", "ignore_failures"]
2537
  REQ_BGL = False
2538

    
2539
  def ExpandNames(self):
2540
    self._ExpandAndLockInstance()
2541
    self.needed_locks[locking.LEVEL_NODE] = []
2542
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2543

    
2544
  def DeclareLocks(self, level):
2545
    if level == locking.LEVEL_NODE:
2546
      self._LockInstancesNodes()
2547

    
2548
  def BuildHooksEnv(self):
2549
    """Build hooks env.
2550

2551
    This runs on master, primary and secondary nodes of the instance.
2552

2553
    """
2554
    env = _BuildInstanceHookEnvByObject(self.instance)
2555
    nl = [self.cfg.GetMasterNode()]
2556
    return env, nl, nl
2557

    
2558
  def CheckPrereq(self):
2559
    """Check prerequisites.
2560

2561
    This checks that the instance is in the cluster.
2562

2563
    """
2564
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2565
    assert self.instance is not None, \
2566
      "Cannot retrieve locked instance %s" % self.op.instance_name
2567

    
2568
  def Exec(self, feedback_fn):
2569
    """Remove the instance.
2570

2571
    """
2572
    instance = self.instance
2573
    logger.Info("shutting down instance %s on node %s" %
2574
                (instance.name, instance.primary_node))
2575

    
2576
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2577
      if self.op.ignore_failures:
2578
        feedback_fn("Warning: can't shutdown instance")
2579
      else:
2580
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2581
                                 (instance.name, instance.primary_node))
2582

    
2583
    logger.Info("removing block devices for instance %s" % instance.name)
2584

    
2585
    if not _RemoveDisks(self, instance):
2586
      if self.op.ignore_failures:
2587
        feedback_fn("Warning: can't remove instance's disks")
2588
      else:
2589
        raise errors.OpExecError("Can't remove instance's disks")
2590

    
2591
    logger.Info("removing instance %s out of cluster config" % instance.name)
2592

    
2593
    self.cfg.RemoveInstance(instance.name)
2594
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2595

    
2596

    
2597
class LUQueryInstances(NoHooksLU):
2598
  """Logical unit for querying instances.
2599

2600
  """
2601
  _OP_REQP = ["output_fields", "names"]
2602
  REQ_BGL = False
2603

    
2604
  def ExpandNames(self):
2605
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2606
    self.static_fields = frozenset([
2607
      "name", "os", "pnode", "snodes",
2608
      "admin_state", "admin_ram",
2609
      "disk_template", "ip", "mac", "bridge",
2610
      "sda_size", "sdb_size", "vcpus", "tags",
2611
      "network_port", "kernel_path", "initrd_path",
2612
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2613
      "hvm_cdrom_image_path", "hvm_nic_type",
2614
      "hvm_disk_type", "vnc_bind_address",
2615
      "serial_no", "hypervisor",
2616
      ])
2617
    _CheckOutputFields(static=self.static_fields,
2618
                       dynamic=self.dynamic_fields,
2619
                       selected=self.op.output_fields)
2620

    
2621
    self.needed_locks = {}
2622
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2623
    self.share_locks[locking.LEVEL_NODE] = 1
2624

    
2625
    if self.op.names:
2626
      self.wanted = _GetWantedInstances(self, self.op.names)
2627
    else:
2628
      self.wanted = locking.ALL_SET
2629

    
2630
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2631
    if self.do_locking:
2632
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2633
      self.needed_locks[locking.LEVEL_NODE] = []
2634
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2635

    
2636
  def DeclareLocks(self, level):
2637
    if level == locking.LEVEL_NODE and self.do_locking:
2638
      self._LockInstancesNodes()
2639

    
2640
  def CheckPrereq(self):
2641
    """Check prerequisites.
2642

2643
    """
2644
    pass
2645

    
2646
  def Exec(self, feedback_fn):
2647
    """Computes the list of nodes and their attributes.
2648

2649
    """
2650
    all_info = self.cfg.GetAllInstancesInfo()
2651
    if self.do_locking:
2652
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2653
    elif self.wanted != locking.ALL_SET:
2654
      instance_names = self.wanted
2655
      missing = set(instance_names).difference(all_info.keys())
2656
      if missing:
2657
        raise errors.OpExecError(
2658
          "Some instances were removed before retrieving their data: %s"
2659
          % missing)
2660
    else:
2661
      instance_names = all_info.keys()
2662
    instance_list = [all_info[iname] for iname in instance_names]
2663

    
2664
    # begin data gathering
2665

    
2666
    nodes = frozenset([inst.primary_node for inst in instance_list])
2667
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2668

    
2669
    bad_nodes = []
2670
    if self.dynamic_fields.intersection(self.op.output_fields):
2671
      live_data = {}
2672
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2673
      for name in nodes:
2674
        result = node_data[name]
2675
        if result:
2676
          live_data.update(result)
2677
        elif result == False:
2678
          bad_nodes.append(name)
2679
        # else no instance is alive
2680
    else:
2681
      live_data = dict([(name, {}) for name in instance_names])
2682

    
2683
    # end data gathering
2684

    
2685
    output = []
2686
    for instance in instance_list:
2687
      iout = []
2688
      for field in self.op.output_fields:
2689
        if field == "name":
2690
          val = instance.name
2691
        elif field == "os":
2692
          val = instance.os
2693
        elif field == "pnode":
2694
          val = instance.primary_node
2695
        elif field == "snodes":
2696
          val = list(instance.secondary_nodes)
2697
        elif field == "admin_state":
2698
          val = (instance.status != "down")
2699
        elif field == "oper_state":
2700
          if instance.primary_node in bad_nodes:
2701
            val = None
2702
          else:
2703
            val = bool(live_data.get(instance.name))
2704
        elif field == "status":
2705
          if instance.primary_node in bad_nodes:
2706
            val = "ERROR_nodedown"
2707
          else:
2708
            running = bool(live_data.get(instance.name))
2709
            if running:
2710
              if instance.status != "down":
2711
                val = "running"
2712
              else:
2713
                val = "ERROR_up"
2714
            else:
2715
              if instance.status != "down":
2716
                val = "ERROR_down"
2717
              else:
2718
                val = "ADMIN_down"
2719
        elif field == "admin_ram":
2720
          val = instance.memory
2721
        elif field == "oper_ram":
2722
          if instance.primary_node in bad_nodes:
2723
            val = None
2724
          elif instance.name in live_data:
2725
            val = live_data[instance.name].get("memory", "?")
2726
          else:
2727
            val = "-"
2728
        elif field == "disk_template":
2729
          val = instance.disk_template
2730
        elif field == "ip":
2731
          val = instance.nics[0].ip
2732
        elif field == "bridge":
2733
          val = instance.nics[0].bridge
2734
        elif field == "mac":
2735
          val = instance.nics[0].mac
2736
        elif field == "sda_size" or field == "sdb_size":
2737
          disk = instance.FindDisk(field[:3])
2738
          if disk is None:
2739
            val = None
2740
          else:
2741
            val = disk.size
2742
        elif field == "vcpus":
2743
          val = instance.vcpus
2744
        elif field == "tags":
2745
          val = list(instance.GetTags())
2746
        elif field == "serial_no":
2747
          val = instance.serial_no
2748
        elif field in ("network_port", "kernel_path", "initrd_path",
2749
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2750
                       "hvm_cdrom_image_path", "hvm_nic_type",
2751
                       "hvm_disk_type", "vnc_bind_address"):
2752
          val = getattr(instance, field, None)
2753
          if val is not None:
2754
            pass
2755
          elif field in ("hvm_nic_type", "hvm_disk_type",
2756
                         "kernel_path", "initrd_path"):
2757
            val = "default"
2758
          else:
2759
            val = "-"
2760
        elif field == "hypervisor":
2761
          val = instance.hypervisor
2762
        else:
2763
          raise errors.ParameterError(field)
2764
        iout.append(val)
2765
      output.append(iout)
2766

    
2767
    return output
2768

    
2769

    
2770
class LUFailoverInstance(LogicalUnit):
2771
  """Failover an instance.
2772

2773
  """
2774
  HPATH = "instance-failover"
2775
  HTYPE = constants.HTYPE_INSTANCE
2776
  _OP_REQP = ["instance_name", "ignore_consistency"]
2777
  REQ_BGL = False
2778

    
2779
  def ExpandNames(self):
2780
    self._ExpandAndLockInstance()
2781
    self.needed_locks[locking.LEVEL_NODE] = []
2782
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2783

    
2784
  def DeclareLocks(self, level):
2785
    if level == locking.LEVEL_NODE:
2786
      self._LockInstancesNodes()
2787

    
2788
  def BuildHooksEnv(self):
2789
    """Build hooks env.
2790

2791
    This runs on master, primary and secondary nodes of the instance.
2792

2793
    """
2794
    env = {
2795
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2796
      }
2797
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2798
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2799
    return env, nl, nl
2800

    
2801
  def CheckPrereq(self):
2802
    """Check prerequisites.
2803

2804
    This checks that the instance is in the cluster.
2805

2806
    """
2807
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2808
    assert self.instance is not None, \
2809
      "Cannot retrieve locked instance %s" % self.op.instance_name
2810

    
2811
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2812
      raise errors.OpPrereqError("Instance's disk layout is not"
2813
                                 " network mirrored, cannot failover.")
2814

    
2815
    secondary_nodes = instance.secondary_nodes
2816
    if not secondary_nodes:
2817
      raise errors.ProgrammerError("no secondary node but using "
2818
                                   "a mirrored disk template")
2819

    
2820
    target_node = secondary_nodes[0]
2821
    # check memory requirements on the secondary node
2822
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2823
                         instance.name, instance.memory,
2824
                         instance.hypervisor)
2825

    
2826
    # check bridge existance
2827
    brlist = [nic.bridge for nic in instance.nics]
2828
    if not self.rpc.call_bridges_exist(target_node, brlist):
2829
      raise errors.OpPrereqError("One or more target bridges %s does not"
2830
                                 " exist on destination node '%s'" %
2831
                                 (brlist, target_node))
2832

    
2833
  def Exec(self, feedback_fn):
2834
    """Failover an instance.
2835

2836
    The failover is done by shutting it down on its present node and
2837
    starting it on the secondary.
2838

2839
    """
2840
    instance = self.instance
2841

    
2842
    source_node = instance.primary_node
2843
    target_node = instance.secondary_nodes[0]
2844

    
2845
    feedback_fn("* checking disk consistency between source and target")
2846
    for dev in instance.disks:
2847
      # for drbd, these are drbd over lvm
2848
      if not _CheckDiskConsistency(self, dev, target_node, False):
2849
        if instance.status == "up" and not self.op.ignore_consistency:
2850
          raise errors.OpExecError("Disk %s is degraded on target node,"
2851
                                   " aborting failover." % dev.iv_name)
2852

    
2853
    feedback_fn("* shutting down instance on source node")
2854
    logger.Info("Shutting down instance %s on node %s" %
2855
                (instance.name, source_node))
2856

    
2857
    if not self.rpc.call_instance_shutdown(source_node, instance):
2858
      if self.op.ignore_consistency:
2859
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2860
                     " anyway. Please make sure node %s is down"  %
2861
                     (instance.name, source_node, source_node))
2862
      else:
2863
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2864
                                 (instance.name, source_node))
2865

    
2866
    feedback_fn("* deactivating the instance's disks on source node")
2867
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2868
      raise errors.OpExecError("Can't shut down the instance's disks.")
2869

    
2870
    instance.primary_node = target_node
2871
    # distribute new instance config to the other nodes
2872
    self.cfg.Update(instance)
2873

    
2874
    # Only start the instance if it's marked as up
2875
    if instance.status == "up":
2876
      feedback_fn("* activating the instance's disks on target node")
2877
      logger.Info("Starting instance %s on node %s" %
2878
                  (instance.name, target_node))
2879

    
2880
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2881
                                               ignore_secondaries=True)
2882
      if not disks_ok:
2883
        _ShutdownInstanceDisks(self, instance)
2884
        raise errors.OpExecError("Can't activate the instance's disks")
2885

    
2886
      feedback_fn("* starting the instance on the target node")
2887
      if not self.rpc.call_instance_start(target_node, instance, None):
2888
        _ShutdownInstanceDisks(self, instance)
2889
        raise errors.OpExecError("Could not start instance %s on node %s." %
2890
                                 (instance.name, target_node))
2891

    
2892

    
2893
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2894
  """Create a tree of block devices on the primary node.
2895

2896
  This always creates all devices.
2897

2898
  """
2899
  if device.children:
2900
    for child in device.children:
2901
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2902
        return False
2903

    
2904
  lu.cfg.SetDiskID(device, node)
2905
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2906
                                       instance.name, True, info)
2907
  if not new_id:
2908
    return False
2909
  if device.physical_id is None:
2910
    device.physical_id = new_id
2911
  return True
2912

    
2913

    
2914
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2915
  """Create a tree of block devices on a secondary node.
2916

2917
  If this device type has to be created on secondaries, create it and
2918
  all its children.
2919

2920
  If not, just recurse to children keeping the same 'force' value.
2921

2922
  """
2923
  if device.CreateOnSecondary():
2924
    force = True
2925
  if device.children:
2926
    for child in device.children:
2927
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2928
                                        child, force, info):
2929
        return False
2930

    
2931
  if not force:
2932
    return True
2933
  lu.cfg.SetDiskID(device, node)
2934
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2935
                                       instance.name, False, info)
2936
  if not new_id:
2937
    return False
2938
  if device.physical_id is None:
2939
    device.physical_id = new_id
2940
  return True
2941

    
2942

    
2943
def _GenerateUniqueNames(lu, exts):
2944
  """Generate a suitable LV name.
2945

2946
  This will generate a logical volume name for the given instance.
2947

2948
  """
2949
  results = []
2950
  for val in exts:
2951
    new_id = lu.cfg.GenerateUniqueID()
2952
    results.append("%s%s" % (new_id, val))
2953
  return results
2954

    
2955

    
2956
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2957
                         p_minor, s_minor):
2958
  """Generate a drbd8 device complete with its children.
2959

2960
  """
2961
  port = lu.cfg.AllocatePort()
2962
  vgname = lu.cfg.GetVGName()
2963
  shared_secret = lu.cfg.GenerateDRBDSecret()
2964
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2965
                          logical_id=(vgname, names[0]))
2966
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2967
                          logical_id=(vgname, names[1]))
2968
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2969
                          logical_id=(primary, secondary, port,
2970
                                      p_minor, s_minor,
2971
                                      shared_secret),
2972
                          children=[dev_data, dev_meta],
2973
                          iv_name=iv_name)
2974
  return drbd_dev
2975

    
2976

    
2977
def _GenerateDiskTemplate(lu, template_name,
2978
                          instance_name, primary_node,
2979
                          secondary_nodes, disk_sz, swap_sz,
2980
                          file_storage_dir, file_driver):
2981
  """Generate the entire disk layout for a given template type.
2982

2983
  """
2984
  #TODO: compute space requirements
2985

    
2986
  vgname = lu.cfg.GetVGName()
2987
  if template_name == constants.DT_DISKLESS:
2988
    disks = []
2989
  elif template_name == constants.DT_PLAIN:
2990
    if len(secondary_nodes) != 0:
2991
      raise errors.ProgrammerError("Wrong template configuration")
2992

    
2993
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2994
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2995
                           logical_id=(vgname, names[0]),
2996
                           iv_name = "sda")
2997
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2998
                           logical_id=(vgname, names[1]),
2999
                           iv_name = "sdb")
3000
    disks = [sda_dev, sdb_dev]
3001
  elif template_name == constants.DT_DRBD8:
3002
    if len(secondary_nodes) != 1:
3003
      raise errors.ProgrammerError("Wrong template configuration")
3004
    remote_node = secondary_nodes[0]
3005
    (minor_pa, minor_pb,
3006
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3007
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3008

    
3009
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3010
                                      ".sdb_data", ".sdb_meta"])
3011
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3012
                                        disk_sz, names[0:2], "sda",
3013
                                        minor_pa, minor_sa)
3014
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3015
                                        swap_sz, names[2:4], "sdb",
3016
                                        minor_pb, minor_sb)
3017
    disks = [drbd_sda_dev, drbd_sdb_dev]
3018
  elif template_name == constants.DT_FILE:
3019
    if len(secondary_nodes) != 0:
3020
      raise errors.ProgrammerError("Wrong template configuration")
3021

    
3022
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3023
                                iv_name="sda", logical_id=(file_driver,
3024
                                "%s/sda" % file_storage_dir))
3025
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3026
                                iv_name="sdb", logical_id=(file_driver,
3027
                                "%s/sdb" % file_storage_dir))
3028
    disks = [file_sda_dev, file_sdb_dev]
3029
  else:
3030
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3031
  return disks
3032

    
3033

    
3034
def _GetInstanceInfoText(instance):
3035
  """Compute that text that should be added to the disk's metadata.
3036

3037
  """
3038
  return "originstname+%s" % instance.name
3039

    
3040

    
3041
def _CreateDisks(lu, instance):
3042
  """Create all disks for an instance.
3043

3044
  This abstracts away some work from AddInstance.
3045

3046
  Args:
3047
    instance: the instance object
3048

3049
  Returns:
3050
    True or False showing the success of the creation process
3051

3052
  """
3053
  info = _GetInstanceInfoText(instance)
3054

    
3055
  if instance.disk_template == constants.DT_FILE:
3056
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3057
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3058
                                                 file_storage_dir)
3059

    
3060
    if not result:
3061
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3062
      return False
3063

    
3064
    if not result[0]:
3065
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3066
      return False
3067

    
3068
  for device in instance.disks:
3069
    logger.Info("creating volume %s for instance %s" %
3070
                (device.iv_name, instance.name))
3071
    #HARDCODE
3072
    for secondary_node in instance.secondary_nodes:
3073
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3074
                                        device, False, info):
3075
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3076
                     (device.iv_name, device, secondary_node))
3077
        return False
3078
    #HARDCODE
3079
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3080
                                    instance, device, info):
3081
      logger.Error("failed to create volume %s on primary!" %
3082
                   device.iv_name)
3083
      return False
3084

    
3085
  return True
3086

    
3087

    
3088
def _RemoveDisks(lu, instance):
3089
  """Remove all disks for an instance.
3090

3091
  This abstracts away some work from `AddInstance()` and
3092
  `RemoveInstance()`. Note that in case some of the devices couldn't
3093
  be removed, the removal will continue with the other ones (compare
3094
  with `_CreateDisks()`).
3095

3096
  Args:
3097
    instance: the instance object
3098

3099
  Returns:
3100
    True or False showing the success of the removal proces
3101

3102
  """
3103
  logger.Info("removing block devices for instance %s" % instance.name)
3104

    
3105
  result = True
3106
  for device in instance.disks:
3107
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3108
      lu.cfg.SetDiskID(disk, node)
3109
      if not lu.rpc.call_blockdev_remove(node, disk):
3110
        logger.Error("could not remove block device %s on node %s,"
3111
                     " continuing anyway" %
3112
                     (device.iv_name, node))
3113
        result = False
3114

    
3115
  if instance.disk_template == constants.DT_FILE:
3116
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3117
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3118
                                               file_storage_dir):
3119
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3120
      result = False
3121

    
3122
  return result
3123

    
3124

    
3125
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3126
  """Compute disk size requirements in the volume group
3127

3128
  This is currently hard-coded for the two-drive layout.
3129

3130
  """
3131
  # Required free disk space as a function of disk and swap space
3132
  req_size_dict = {
3133
    constants.DT_DISKLESS: None,
3134
    constants.DT_PLAIN: disk_size + swap_size,
3135
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3136
    constants.DT_DRBD8: disk_size + swap_size + 256,
3137
    constants.DT_FILE: None,
3138
  }
3139

    
3140
  if disk_template not in req_size_dict:
3141
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3142
                                 " is unknown" %  disk_template)
3143

    
3144
  return req_size_dict[disk_template]
3145

    
3146

    
3147
class LUCreateInstance(LogicalUnit):
3148
  """Create an instance.
3149

3150
  """
3151
  HPATH = "instance-add"
3152
  HTYPE = constants.HTYPE_INSTANCE
3153
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3154
              "disk_template", "swap_size", "mode", "start", "vcpus",
3155
              "wait_for_sync", "ip_check", "mac"]
3156
  REQ_BGL = False
3157

    
3158
  def _ExpandNode(self, node):
3159
    """Expands and checks one node name.
3160

3161
    """
3162
    node_full = self.cfg.ExpandNodeName(node)
3163
    if node_full is None:
3164
      raise errors.OpPrereqError("Unknown node %s" % node)
3165
    return node_full
3166

    
3167
  def ExpandNames(self):
3168
    """ExpandNames for CreateInstance.
3169

3170
    Figure out the right locks for instance creation.
3171

3172
    """
3173
    self.needed_locks = {}
3174

    
3175
    # set optional parameters to none if they don't exist
3176
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3177
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3178
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3179
                 "vnc_bind_address", "hypervisor"]:
3180
      if not hasattr(self.op, attr):
3181
        setattr(self.op, attr, None)
3182

    
3183
    # cheap checks, mostly valid constants given
3184

    
3185
    # verify creation mode
3186
    if self.op.mode not in (constants.INSTANCE_CREATE,
3187
                            constants.INSTANCE_IMPORT):
3188
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3189
                                 self.op.mode)
3190

    
3191
    # disk template and mirror node verification
3192
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3193
      raise errors.OpPrereqError("Invalid disk template name")
3194

    
3195
    if self.op.hypervisor is None:
3196
      self.op.hypervisor = self.cfg.GetHypervisorType()
3197

    
3198
    enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3199
    if self.op.hypervisor not in enabled_hvs:
3200
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3201
                                 " cluster (%s)" % (self.op.hypervisor,
3202
                                  ",".join(enabled_hvs)))
3203

    
3204
    #### instance parameters check
3205

    
3206
    # instance name verification
3207
    hostname1 = utils.HostInfo(self.op.instance_name)
3208
    self.op.instance_name = instance_name = hostname1.name
3209

    
3210
    # this is just a preventive check, but someone might still add this
3211
    # instance in the meantime, and creation will fail at lock-add time
3212
    if instance_name in self.cfg.GetInstanceList():
3213
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3214
                                 instance_name)
3215

    
3216
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3217

    
3218
    # ip validity checks
3219
    ip = getattr(self.op, "ip", None)
3220
    if ip is None or ip.lower() == "none":
3221
      inst_ip = None
3222
    elif ip.lower() == "auto":
3223
      inst_ip = hostname1.ip
3224
    else:
3225
      if not utils.IsValidIP(ip):
3226
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3227
                                   " like a valid IP" % ip)
3228
      inst_ip = ip
3229
    self.inst_ip = self.op.ip = inst_ip
3230
    # used in CheckPrereq for ip ping check
3231
    self.check_ip = hostname1.ip
3232

    
3233
    # MAC address verification
3234
    if self.op.mac != "auto":
3235
      if not utils.IsValidMac(self.op.mac.lower()):
3236
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3237
                                   self.op.mac)
3238

    
3239
    # boot order verification
3240
    if self.op.hvm_boot_order is not None:
3241
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3242
        raise errors.OpPrereqError("invalid boot order specified,"
3243
                                   " must be one or more of [acdn]")
3244
    # file storage checks
3245
    if (self.op.file_driver and
3246
        not self.op.file_driver in constants.FILE_DRIVER):
3247
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3248
                                 self.op.file_driver)
3249

    
3250
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3251
      raise errors.OpPrereqError("File storage directory path not absolute")
3252

    
3253
    ### Node/iallocator related checks
3254
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3255
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3256
                                 " node must be given")
3257

    
3258
    if self.op.iallocator:
3259
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3260
    else:
3261
      self.op.pnode = self._ExpandNode(self.op.pnode)
3262
      nodelist = [self.op.pnode]
3263
      if self.op.snode is not None:
3264
        self.op.snode = self._ExpandNode(self.op.snode)
3265
        nodelist.append(self.op.snode)
3266
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3267

    
3268
    # in case of import lock the source node too
3269
    if self.op.mode == constants.INSTANCE_IMPORT:
3270
      src_node = getattr(self.op, "src_node", None)
3271
      src_path = getattr(self.op, "src_path", None)
3272

    
3273
      if src_node is None or src_path is None:
3274
        raise errors.OpPrereqError("Importing an instance requires source"
3275
                                   " node and path options")
3276

    
3277
      if not os.path.isabs(src_path):
3278
        raise errors.OpPrereqError("The source path must be absolute")
3279

    
3280
      self.op.src_node = src_node = self._ExpandNode(src_node)
3281
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3282
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3283

    
3284
    else: # INSTANCE_CREATE
3285
      if getattr(self.op, "os_type", None) is None:
3286
        raise errors.OpPrereqError("No guest OS specified")
3287

    
3288
  def _RunAllocator(self):
3289
    """Run the allocator based on input opcode.
3290

3291
    """
3292
    disks = [{"size": self.op.disk_size, "mode": "w"},
3293
             {"size": self.op.swap_size, "mode": "w"}]
3294
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3295
             "bridge": self.op.bridge}]
3296
    ial = IAllocator(self,
3297
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3298
                     name=self.op.instance_name,
3299
                     disk_template=self.op.disk_template,
3300
                     tags=[],
3301
                     os=self.op.os_type,
3302
                     vcpus=self.op.vcpus,
3303
                     mem_size=self.op.mem_size,
3304
                     disks=disks,
3305
                     nics=nics,
3306
                     )
3307

    
3308
    ial.Run(self.op.iallocator)
3309

    
3310
    if not ial.success:
3311
      raise errors.OpPrereqError("Can't compute nodes using"
3312
                                 " iallocator '%s': %s" % (self.op.iallocator,
3313
                                                           ial.info))
3314
    if len(ial.nodes) != ial.required_nodes:
3315
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3316
                                 " of nodes (%s), required %s" %
3317
                                 (self.op.iallocator, len(ial.nodes),
3318
                                  ial.required_nodes))
3319
    self.op.pnode = ial.nodes[0]
3320
    logger.ToStdout("Selected nodes for the instance: %s" %
3321
                    (", ".join(ial.nodes),))
3322
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3323
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3324
    if ial.required_nodes == 2:
3325
      self.op.snode = ial.nodes[1]
3326

    
3327
  def BuildHooksEnv(self):
3328
    """Build hooks env.
3329

3330
    This runs on master, primary and secondary nodes of the instance.
3331

3332
    """
3333
    env = {
3334
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3335
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3336
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3337
      "INSTANCE_ADD_MODE": self.op.mode,
3338
      }
3339
    if self.op.mode == constants.INSTANCE_IMPORT:
3340
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3341
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3342
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3343

    
3344
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3345
      primary_node=self.op.pnode,
3346
      secondary_nodes=self.secondaries,
3347
      status=self.instance_status,
3348
      os_type=self.op.os_type,
3349
      memory=self.op.mem_size,
3350
      vcpus=self.op.vcpus,
3351
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3352
    ))
3353

    
3354
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3355
          self.secondaries)
3356
    return env, nl, nl
3357

    
3358

    
3359
  def CheckPrereq(self):
3360
    """Check prerequisites.
3361

3362
    """
3363
    if (not self.cfg.GetVGName() and
3364
        self.op.disk_template not in constants.DTS_NOT_LVM):
3365
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3366
                                 " instances")
3367

    
3368

    
3369
    if self.op.mode == constants.INSTANCE_IMPORT:
3370
      src_node = self.op.src_node
3371
      src_path = self.op.src_path
3372

    
3373
      export_info = self.rpc.call_export_info(src_node, src_path)
3374

    
3375
      if not export_info:
3376
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3377

    
3378
      if not export_info.has_section(constants.INISECT_EXP):
3379
        raise errors.ProgrammerError("Corrupted export config")
3380

    
3381
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3382
      if (int(ei_version) != constants.EXPORT_VERSION):
3383
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3384
                                   (ei_version, constants.EXPORT_VERSION))
3385

    
3386
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3387
        raise errors.OpPrereqError("Can't import instance with more than"
3388
                                   " one data disk")
3389

    
3390
      # FIXME: are the old os-es, disk sizes, etc. useful?
3391
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3392
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3393
                                                         'disk0_dump'))
3394
      self.src_image = diskimage
3395

    
3396
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3397

    
3398
    if self.op.start and not self.op.ip_check:
3399
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3400
                                 " adding an instance in start mode")
3401

    
3402
    if self.op.ip_check:
3403
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3404
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3405
                                   (self.check_ip, self.op.instance_name))
3406

    
3407
    # bridge verification
3408
    bridge = getattr(self.op, "bridge", None)
3409
    if bridge is None:
3410
      self.op.bridge = self.cfg.GetDefBridge()
3411
    else:
3412
      self.op.bridge = bridge
3413

    
3414
    #### allocator run
3415

    
3416
    if self.op.iallocator is not None:
3417
      self._RunAllocator()
3418

    
3419
    #### node related checks
3420

    
3421
    # check primary node
3422
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3423
    assert self.pnode is not None, \
3424
      "Cannot retrieve locked node %s" % self.op.pnode
3425
    self.secondaries = []
3426

    
3427
    # mirror node verification
3428
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3429
      if self.op.snode is None:
3430
        raise errors.OpPrereqError("The networked disk templates need"
3431
                                   " a mirror node")
3432
      if self.op.snode == pnode.name:
3433
        raise errors.OpPrereqError("The secondary node cannot be"
3434
                                   " the primary node.")
3435
      self.secondaries.append(self.op.snode)
3436

    
3437
    req_size = _ComputeDiskSize(self.op.disk_template,
3438
                                self.op.disk_size, self.op.swap_size)
3439

    
3440
    # Check lv size requirements
3441
    if req_size is not None:
3442
      nodenames = [pnode.name] + self.secondaries
3443
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3444
                                         self.op.hypervisor)
3445
      for node in nodenames:
3446
        info = nodeinfo.get(node, None)
3447
        if not info:
3448
          raise errors.OpPrereqError("Cannot get current information"
3449
                                     " from node '%s'" % node)
3450
        vg_free = info.get('vg_free', None)
3451
        if not isinstance(vg_free, int):
3452
          raise errors.OpPrereqError("Can't compute free disk space on"
3453
                                     " node %s" % node)
3454
        if req_size > info['vg_free']:
3455
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3456
                                     " %d MB available, %d MB required" %
3457
                                     (node, info['vg_free'], req_size))
3458

    
3459
    # os verification
3460
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3461
    if not os_obj:
3462
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3463
                                 " primary node"  % self.op.os_type)
3464

    
3465
    if self.op.kernel_path == constants.VALUE_NONE:
3466
      raise errors.OpPrereqError("Can't set instance kernel to none")
3467

    
3468
    # bridge check on primary node
3469
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3470
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3471
                                 " destination node '%s'" %
3472
                                 (self.op.bridge, pnode.name))
3473

    
3474
    # memory check on primary node
3475
    if self.op.start:
3476
      _CheckNodeFreeMemory(self, self.pnode.name,
3477
                           "creating instance %s" % self.op.instance_name,
3478
                           self.op.mem_size, self.op.hypervisor)
3479

    
3480
    # hvm_cdrom_image_path verification
3481
    if self.op.hvm_cdrom_image_path is not None:
3482
      # FIXME (als): shouldn't these checks happen on the destination node?
3483
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3484
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3485
                                   " be an absolute path or None, not %s" %
3486
                                   self.op.hvm_cdrom_image_path)
3487
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3488
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3489
                                   " regular file or a symlink pointing to"
3490
                                   " an existing regular file, not %s" %
3491
                                   self.op.hvm_cdrom_image_path)
3492

    
3493
    # vnc_bind_address verification
3494
    if self.op.vnc_bind_address is not None:
3495
      if not utils.IsValidIP(self.op.vnc_bind_address):
3496
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3497
                                   " like a valid IP address" %
3498
                                   self.op.vnc_bind_address)
3499

    
3500
    # Xen HVM device type checks
3501
    if self.op.hypervisor == constants.HT_XEN_HVM:
3502
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3503
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3504
                                   " hypervisor" % self.op.hvm_nic_type)
3505
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3506
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3507
                                   " hypervisor" % self.op.hvm_disk_type)
3508

    
3509
    if self.op.start:
3510
      self.instance_status = 'up'
3511
    else:
3512
      self.instance_status = 'down'
3513

    
3514
  def Exec(self, feedback_fn):
3515
    """Create and add the instance to the cluster.
3516

3517
    """
3518
    instance = self.op.instance_name
3519
    pnode_name = self.pnode.name
3520

    
3521
    if self.op.mac == "auto":
3522
      mac_address = self.cfg.GenerateMAC()
3523
    else:
3524
      mac_address = self.op.mac
3525

    
3526
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3527
    if self.inst_ip is not None:
3528
      nic.ip = self.inst_ip
3529

    
3530
    ht_kind = self.op.hypervisor
3531
    if ht_kind in constants.HTS_REQ_PORT:
3532
      network_port = self.cfg.AllocatePort()
3533
    else:
3534
      network_port = None
3535

    
3536
    if self.op.vnc_bind_address is None:
3537
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3538

    
3539
    # this is needed because os.path.join does not accept None arguments
3540
    if self.op.file_storage_dir is None:
3541
      string_file_storage_dir = ""
3542
    else:
3543
      string_file_storage_dir = self.op.file_storage_dir
3544

    
3545
    # build the full file storage dir path
3546
    file_storage_dir = os.path.normpath(os.path.join(
3547
                                        self.cfg.GetFileStorageDir(),
3548
                                        string_file_storage_dir, instance))
3549

    
3550

    
3551
    disks = _GenerateDiskTemplate(self,
3552
                                  self.op.disk_template,
3553
                                  instance, pnode_name,
3554
                                  self.secondaries, self.op.disk_size,
3555
                                  self.op.swap_size,
3556
                                  file_storage_dir,
3557
                                  self.op.file_driver)
3558

    
3559
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3560
                            primary_node=pnode_name,
3561
                            memory=self.op.mem_size,
3562
                            vcpus=self.op.vcpus,
3563
                            nics=[nic], disks=disks,
3564
                            disk_template=self.op.disk_template,
3565
                            status=self.instance_status,
3566
                            network_port=network_port,
3567
                            kernel_path=self.op.kernel_path,
3568
                            initrd_path=self.op.initrd_path,
3569
                            hvm_boot_order=self.op.hvm_boot_order,
3570
                            hvm_acpi=self.op.hvm_acpi,
3571
                            hvm_pae=self.op.hvm_pae,
3572
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3573
                            vnc_bind_address=self.op.vnc_bind_address,
3574
                            hvm_nic_type=self.op.hvm_nic_type,
3575
                            hvm_disk_type=self.op.hvm_disk_type,
3576
                            hypervisor=self.op.hypervisor,
3577
                            )
3578

    
3579
    feedback_fn("* creating instance disks...")
3580
    if not _CreateDisks(self, iobj):
3581
      _RemoveDisks(self, iobj)
3582
      self.cfg.ReleaseDRBDMinors(instance)
3583
      raise errors.OpExecError("Device creation failed, reverting...")
3584

    
3585
    feedback_fn("adding instance %s to cluster config" % instance)
3586

    
3587
    self.cfg.AddInstance(iobj)
3588
    # Declare that we don't want to remove the instance lock anymore, as we've
3589
    # added the instance to the config
3590
    del self.remove_locks[locking.LEVEL_INSTANCE]
3591
    # Remove the temp. assignements for the instance's drbds
3592
    self.cfg.ReleaseDRBDMinors(instance)
3593

    
3594
    if self.op.wait_for_sync:
3595
      disk_abort = not _WaitForSync(self, iobj)
3596
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3597
      # make sure the disks are not degraded (still sync-ing is ok)
3598
      time.sleep(15)
3599
      feedback_fn("* checking mirrors status")
3600
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3601
    else:
3602
      disk_abort = False
3603

    
3604
    if disk_abort:
3605
      _RemoveDisks(self, iobj)
3606
      self.cfg.RemoveInstance(iobj.name)
3607
      # Make sure the instance lock gets removed
3608
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3609
      raise errors.OpExecError("There are some degraded disks for"
3610
                               " this instance")
3611

    
3612
    feedback_fn("creating os for instance %s on node %s" %
3613
                (instance, pnode_name))
3614

    
3615
    if iobj.disk_template != constants.DT_DISKLESS:
3616
      if self.op.mode == constants.INSTANCE_CREATE:
3617
        feedback_fn("* running the instance OS create scripts...")
3618
        if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3619
          raise errors.OpExecError("could not add os for instance %s"
3620
                                   " on node %s" %
3621
                                   (instance, pnode_name))
3622

    
3623
      elif self.op.mode == constants.INSTANCE_IMPORT:
3624
        feedback_fn("* running the instance OS import scripts...")
3625
        src_node = self.op.src_node
3626
        src_image = self.src_image
3627
        cluster_name = self.cfg.GetClusterName()
3628
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3629
                                                src_node, src_image,
3630
                                                cluster_name):
3631
          raise errors.OpExecError("Could not import os for instance"
3632
                                   " %s on node %s" %
3633
                                   (instance, pnode_name))
3634
      else:
3635
        # also checked in the prereq part
3636
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3637
                                     % self.op.mode)
3638

    
3639
    if self.op.start:
3640
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3641
      feedback_fn("* starting instance...")
3642
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3643
        raise errors.OpExecError("Could not start instance")
3644

    
3645

    
3646
class LUConnectConsole(NoHooksLU):
3647
  """Connect to an instance's console.
3648

3649
  This is somewhat special in that it returns the command line that
3650
  you need to run on the master node in order to connect to the
3651
  console.
3652

3653
  """
3654
  _OP_REQP = ["instance_name"]
3655
  REQ_BGL = False
3656

    
3657
  def ExpandNames(self):
3658
    self._ExpandAndLockInstance()
3659

    
3660
  def CheckPrereq(self):
3661
    """Check prerequisites.
3662

3663
    This checks that the instance is in the cluster.
3664

3665
    """
3666
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3667
    assert self.instance is not None, \
3668
      "Cannot retrieve locked instance %s" % self.op.instance_name
3669

    
3670
  def Exec(self, feedback_fn):
3671
    """Connect to the console of an instance
3672

3673
    """
3674
    instance = self.instance
3675
    node = instance.primary_node
3676

    
3677
    node_insts = self.rpc.call_instance_list([node],
3678
                                             [instance.hypervisor])[node]
3679
    if node_insts is False:
3680
      raise errors.OpExecError("Can't connect to node %s." % node)
3681

    
3682
    if instance.name not in node_insts:
3683
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3684

    
3685
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3686

    
3687
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3688
    console_cmd = hyper.GetShellCommandForConsole(instance)
3689

    
3690
    # build ssh cmdline
3691
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3692

    
3693

    
3694
class LUReplaceDisks(LogicalUnit):
3695
  """Replace the disks of an instance.
3696

3697
  """
3698
  HPATH = "mirrors-replace"
3699
  HTYPE = constants.HTYPE_INSTANCE
3700
  _OP_REQP = ["instance_name", "mode", "disks"]
3701
  REQ_BGL = False
3702

    
3703
  def ExpandNames(self):
3704
    self._ExpandAndLockInstance()
3705

    
3706
    if not hasattr(self.op, "remote_node"):
3707
      self.op.remote_node = None
3708

    
3709
    ia_name = getattr(self.op, "iallocator", None)
3710
    if ia_name is not None:
3711
      if self.op.remote_node is not None:
3712
        raise errors.OpPrereqError("Give either the iallocator or the new"
3713
                                   " secondary, not both")
3714
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3715
    elif self.op.remote_node is not None:
3716
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3717
      if remote_node is None:
3718
        raise errors.OpPrereqError("Node '%s' not known" %
3719
                                   self.op.remote_node)
3720
      self.op.remote_node = remote_node
3721
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3722
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3723
    else:
3724
      self.needed_locks[locking.LEVEL_NODE] = []
3725
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3726

    
3727
  def DeclareLocks(self, level):
3728
    # If we're not already locking all nodes in the set we have to declare the
3729
    # instance's primary/secondary nodes.
3730
    if (level == locking.LEVEL_NODE and
3731
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3732
      self._LockInstancesNodes()
3733

    
3734
  def _RunAllocator(self):
3735
    """Compute a new secondary node using an IAllocator.
3736

3737
    """
3738
    ial = IAllocator(self,
3739
                     mode=constants.IALLOCATOR_MODE_RELOC,
3740
                     name=self.op.instance_name,
3741
                     relocate_from=[self.sec_node])
3742

    
3743
    ial.Run(self.op.iallocator)
3744

    
3745
    if not ial.success:
3746
      raise errors.OpPrereqError("Can't compute nodes using"
3747
                                 " iallocator '%s': %s" % (self.op.iallocator,
3748
                                                           ial.info))
3749
    if len(ial.nodes) != ial.required_nodes:
3750
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3751
                                 " of nodes (%s), required %s" %
3752
                                 (len(ial.nodes), ial.required_nodes))
3753
    self.op.remote_node = ial.nodes[0]
3754
    logger.ToStdout("Selected new secondary for the instance: %s" %
3755
                    self.op.remote_node)
3756

    
3757
  def BuildHooksEnv(self):
3758
    """Build hooks env.
3759

3760
    This runs on the master, the primary and all the secondaries.
3761

3762
    """
3763
    env = {
3764
      "MODE": self.op.mode,
3765
      "NEW_SECONDARY": self.op.remote_node,
3766
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3767
      }
3768
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3769
    nl = [
3770
      self.cfg.GetMasterNode(),
3771
      self.instance.primary_node,
3772
      ]
3773
    if self.op.remote_node is not None:
3774
      nl.append(self.op.remote_node)
3775
    return env, nl, nl
3776

    
3777
  def CheckPrereq(self):
3778
    """Check prerequisites.
3779

3780
    This checks that the instance is in the cluster.
3781

3782
    """
3783
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3784
    assert instance is not None, \
3785
      "Cannot retrieve locked instance %s" % self.op.instance_name
3786
    self.instance = instance
3787

    
3788
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3789
      raise errors.OpPrereqError("Instance's disk layout is not"
3790
                                 " network mirrored.")
3791

    
3792
    if len(instance.secondary_nodes) != 1:
3793
      raise errors.OpPrereqError("The instance has a strange layout,"
3794
                                 " expected one secondary but found %d" %
3795
                                 len(instance.secondary_nodes))
3796

    
3797
    self.sec_node = instance.secondary_nodes[0]
3798

    
3799
    ia_name = getattr(self.op, "iallocator", None)
3800
    if ia_name is not None:
3801
      self._RunAllocator()
3802

    
3803
    remote_node = self.op.remote_node
3804
    if remote_node is not None:
3805
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3806
      assert self.remote_node_info is not None, \
3807
        "Cannot retrieve locked node %s" % remote_node
3808
    else:
3809
      self.remote_node_info = None
3810
    if remote_node == instance.primary_node:
3811
      raise errors.OpPrereqError("The specified node is the primary node of"
3812
                                 " the instance.")
3813
    elif remote_node == self.sec_node:
3814
      if self.op.mode == constants.REPLACE_DISK_SEC:
3815
        # this is for DRBD8, where we can't execute the same mode of
3816
        # replacement as for drbd7 (no different port allocated)
3817
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3818
                                   " replacement")
3819
    if instance.disk_template == constants.DT_DRBD8:
3820
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3821
          remote_node is not None):
3822
        # switch to replace secondary mode
3823
        self.op.mode = constants.REPLACE_DISK_SEC
3824

    
3825
      if self.op.mode == constants.REPLACE_DISK_ALL:
3826
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3827
                                   " secondary disk replacement, not"
3828
                                   " both at once")
3829
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3830
        if remote_node is not None:
3831
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3832
                                     " the secondary while doing a primary"
3833
                                     " node disk replacement")
3834
        self.tgt_node = instance.primary_node
3835
        self.oth_node = instance.secondary_nodes[0]
3836
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3837
        self.new_node = remote_node # this can be None, in which case
3838
                                    # we don't change the secondary
3839
        self.tgt_node = instance.secondary_nodes[0]
3840
        self.oth_node = instance.primary_node
3841
      else:
3842
        raise errors.ProgrammerError("Unhandled disk replace mode")
3843

    
3844
    for name in self.op.disks:
3845
      if instance.FindDisk(name) is None:
3846
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3847
                                   (name, instance.name))
3848

    
3849
  def _ExecD8DiskOnly(self, feedback_fn):
3850
    """Replace a disk on the primary or secondary for dbrd8.
3851

3852
    The algorithm for replace is quite complicated:
3853
      - for each disk to be replaced:
3854
        - create new LVs on the target node with unique names
3855
        - detach old LVs from the drbd device
3856
        - rename old LVs to name_replaced.<time_t>
3857
        - rename new LVs to old LVs
3858
        - attach the new LVs (with the old names now) to the drbd device
3859
      - wait for sync across all devices
3860
      - for each modified disk:
3861
        - remove old LVs (which have the name name_replaces.<time_t>)
3862

3863
    Failures are not very well handled.
3864

3865
    """
3866
    steps_total = 6
3867
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3868
    instance = self.instance
3869
    iv_names = {}
3870
    vgname = self.cfg.GetVGName()
3871
    # start of work
3872
    cfg = self.cfg
3873
    tgt_node = self.tgt_node
3874
    oth_node = self.oth_node
3875

    
3876
    # Step: check device activation
3877
    self.proc.LogStep(1, steps_total, "check device existence")
3878
    info("checking volume groups")
3879
    my_vg = cfg.GetVGName()
3880
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3881
    if not results:
3882
      raise errors.OpExecError("Can't list volume groups on the nodes")
3883
    for node in oth_node, tgt_node:
3884
      res = results.get(node, False)
3885
      if not res or my_vg not in res:
3886
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3887
                                 (my_vg, node))
3888
    for dev in instance.disks:
3889
      if not dev.iv_name in self.op.disks:
3890
        continue
3891
      for node in tgt_node, oth_node:
3892
        info("checking %s on %s" % (dev.iv_name, node))
3893
        cfg.SetDiskID(dev, node)
3894
        if not self.rpc.call_blockdev_find(node, dev):
3895
          raise errors.OpExecError("Can't find device %s on node %s" %
3896
                                   (dev.iv_name, node))
3897

    
3898
    # Step: check other node consistency
3899
    self.proc.LogStep(2, steps_total, "check peer consistency")
3900
    for dev in instance.disks:
3901
      if not dev.iv_name in self.op.disks:
3902
        continue
3903
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3904
      if not _CheckDiskConsistency(self, dev, oth_node,
3905
                                   oth_node==instance.primary_node):
3906
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3907
                                 " to replace disks on this node (%s)" %
3908
                                 (oth_node, tgt_node))
3909

    
3910
    # Step: create new storage
3911
    self.proc.LogStep(3, steps_total, "allocate new storage")
3912
    for dev in instance.disks:
3913
      if not dev.iv_name in self.op.disks:
3914
        continue
3915
      size = dev.size
3916
      cfg.SetDiskID(dev, tgt_node)
3917
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3918
      names = _GenerateUniqueNames(self, lv_names)
3919
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3920
                             logical_id=(vgname, names[0]))
3921
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3922
                             logical_id=(vgname, names[1]))
3923
      new_lvs = [lv_data, lv_meta]
3924
      old_lvs = dev.children
3925
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3926
      info("creating new local storage on %s for %s" %
3927
           (tgt_node, dev.iv_name))
3928
      # since we *always* want to create this LV, we use the
3929
      # _Create...OnPrimary (which forces the creation), even if we
3930
      # are talking about the secondary node
3931
      for new_lv in new_lvs:
3932
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3933
                                        _GetInstanceInfoText(instance)):
3934
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3935
                                   " node '%s'" %
3936
                                   (new_lv.logical_id[1], tgt_node))
3937

    
3938
    # Step: for each lv, detach+rename*2+attach
3939
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3940
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3941
      info("detaching %s drbd from local storage" % dev.iv_name)
3942
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3943
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3944
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3945
      #dev.children = []
3946
      #cfg.Update(instance)
3947

    
3948
      # ok, we created the new LVs, so now we know we have the needed
3949
      # storage; as such, we proceed on the target node to rename
3950
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3951
      # using the assumption that logical_id == physical_id (which in
3952
      # turn is the unique_id on that node)
3953

    
3954
      # FIXME(iustin): use a better name for the replaced LVs
3955
      temp_suffix = int(time.time())
3956
      ren_fn = lambda d, suff: (d.physical_id[0],
3957
                                d.physical_id[1] + "_replaced-%s" % suff)
3958
      # build the rename list based on what LVs exist on the node
3959
      rlist = []
3960
      for to_ren in old_lvs:
3961
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3962
        if find_res is not None: # device exists
3963
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3964

    
3965
      info("renaming the old LVs on the target node")
3966
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3967
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3968
      # now we rename the new LVs to the old LVs
3969
      info("renaming the new LVs on the target node")
3970
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3971
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3972
        raise errors.OpExecError("Can't rename new LVs on node %s"