Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ e69d05fd

History | View | Annotate | Download (190.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_BGL = True
69

    
70
  def __init__(self, processor, op, context):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = context.cfg
80
    self.context = context
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90

    
91
    for attr_name in self._OP_REQP:
92
      attr_val = getattr(op, attr_name, None)
93
      if attr_val is None:
94
        raise errors.OpPrereqError("Required parameter '%s' missing" %
95
                                   attr_name)
96

    
97
    if not self.cfg.IsCluster():
98
      raise errors.OpPrereqError("Cluster not initialized yet,"
99
                                 " use 'gnt-cluster init' first.")
100
    if self.REQ_MASTER:
101
      master = self.cfg.GetMasterNode()
102
      if master != utils.HostInfo().name:
103
        raise errors.OpPrereqError("Commands must be run on the master"
104
                                   " node %s" % master)
105

    
106
  def __GetSSH(self):
107
    """Returns the SshRunner object
108

109
    """
110
    if not self.__ssh:
111
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112
    return self.__ssh
113

    
114
  ssh = property(fget=__GetSSH)
115

    
116
  def ExpandNames(self):
117
    """Expand names for this LU.
118

119
    This method is called before starting to execute the opcode, and it should
120
    update all the parameters of the opcode to their canonical form (e.g. a
121
    short node name must be fully expanded after this method has successfully
122
    completed). This way locking, hooks, logging, ecc. can work correctly.
123

124
    LUs which implement this method must also populate the self.needed_locks
125
    member, as a dict with lock levels as keys, and a list of needed lock names
126
    as values. Rules:
127
      - Use an empty dict if you don't need any lock
128
      - If you don't need any lock at a particular level omit that level
129
      - Don't put anything for the BGL level
130
      - If you want all locks at a level use locking.ALL_SET as a value
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: locking.ALL_SET,
140
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self, primary_only=False):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    @type primary_only: boolean
285
    @param primary_only: only lock primary nodes of locked instances
286

287
    """
288
    assert locking.LEVEL_NODE in self.recalculate_locks, \
289
      "_LockInstancesNodes helper function called with no nodes to recalculate"
290

    
291
    # TODO: check if we're really been called with the instance locks held
292

    
293
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294
    # future we might want to have different behaviors depending on the value
295
    # of self.recalculate_locks[locking.LEVEL_NODE]
296
    wanted_nodes = []
297
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298
      instance = self.context.cfg.GetInstanceInfo(instance_name)
299
      wanted_nodes.append(instance.primary_node)
300
      if not primary_only:
301
        wanted_nodes.extend(instance.secondary_nodes)
302

    
303
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307

    
308
    del self.recalculate_locks[locking.LEVEL_NODE]
309

    
310

    
311
class NoHooksLU(LogicalUnit):
312
  """Simple LU which runs no hooks.
313

314
  This LU is intended as a parent for other LogicalUnits which will
315
  run no hooks, in order to reduce duplicate code.
316

317
  """
318
  HPATH = None
319
  HTYPE = None
320

    
321

    
322
def _GetWantedNodes(lu, nodes):
323
  """Returns list of checked and expanded node names.
324

325
  Args:
326
    nodes: List of nodes (strings) or None for all
327

328
  """
329
  if not isinstance(nodes, list):
330
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
331

    
332
  if not nodes:
333
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334
      " non-empty list of nodes whose name is to be expanded.")
335

    
336
  wanted = []
337
  for name in nodes:
338
    node = lu.cfg.ExpandNodeName(name)
339
    if node is None:
340
      raise errors.OpPrereqError("No such node name '%s'" % name)
341
    wanted.append(node)
342

    
343
  return utils.NiceSort(wanted)
344

    
345

    
346
def _GetWantedInstances(lu, instances):
347
  """Returns list of checked and expanded instance names.
348

349
  Args:
350
    instances: List of instances (strings) or None for all
351

352
  """
353
  if not isinstance(instances, list):
354
    raise errors.OpPrereqError("Invalid argument type 'instances'")
355

    
356
  if instances:
357
    wanted = []
358

    
359
    for name in instances:
360
      instance = lu.cfg.ExpandInstanceName(name)
361
      if instance is None:
362
        raise errors.OpPrereqError("No such instance name '%s'" % name)
363
      wanted.append(instance)
364

    
365
  else:
366
    wanted = lu.cfg.GetInstanceList()
367
  return utils.NiceSort(wanted)
368

    
369

    
370
def _CheckOutputFields(static, dynamic, selected):
371
  """Checks whether all selected fields are valid.
372

373
  Args:
374
    static: Static fields
375
    dynamic: Dynamic fields
376

377
  """
378
  static_fields = frozenset(static)
379
  dynamic_fields = frozenset(dynamic)
380

    
381
  all_fields = static_fields | dynamic_fields
382

    
383
  if not all_fields.issuperset(selected):
384
    raise errors.OpPrereqError("Unknown output fields selected: %s"
385
                               % ",".join(frozenset(selected).
386
                                          difference(all_fields)))
387

    
388

    
389
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390
                          memory, vcpus, nics):
391
  """Builds instance related env variables for hooks from single variables.
392

393
  Args:
394
    secondary_nodes: List of secondary nodes as strings
395
  """
396
  env = {
397
    "OP_TARGET": name,
398
    "INSTANCE_NAME": name,
399
    "INSTANCE_PRIMARY": primary_node,
400
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401
    "INSTANCE_OS_TYPE": os_type,
402
    "INSTANCE_STATUS": status,
403
    "INSTANCE_MEMORY": memory,
404
    "INSTANCE_VCPUS": vcpus,
405
  }
406

    
407
  if nics:
408
    nic_count = len(nics)
409
    for idx, (ip, bridge, mac) in enumerate(nics):
410
      if ip is None:
411
        ip = ""
412
      env["INSTANCE_NIC%d_IP" % idx] = ip
413
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415
  else:
416
    nic_count = 0
417

    
418
  env["INSTANCE_NIC_COUNT"] = nic_count
419

    
420
  return env
421

    
422

    
423
def _BuildInstanceHookEnvByObject(instance, override=None):
424
  """Builds instance related env variables for hooks from an object.
425

426
  Args:
427
    instance: objects.Instance object of instance
428
    override: dict of values to override
429
  """
430
  args = {
431
    'name': instance.name,
432
    'primary_node': instance.primary_node,
433
    'secondary_nodes': instance.secondary_nodes,
434
    'os_type': instance.os,
435
    'status': instance.os,
436
    'memory': instance.memory,
437
    'vcpus': instance.vcpus,
438
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
439
  }
440
  if override:
441
    args.update(override)
442
  return _BuildInstanceHookEnv(**args)
443

    
444

    
445
def _CheckInstanceBridgesExist(instance):
446
  """Check that the brigdes needed by an instance exist.
447

448
  """
449
  # check bridges existance
450
  brlist = [nic.bridge for nic in instance.nics]
451
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
452
    raise errors.OpPrereqError("one or more target bridges %s does not"
453
                               " exist on destination node '%s'" %
454
                               (brlist, instance.primary_node))
455

    
456

    
457
class LUDestroyCluster(NoHooksLU):
458
  """Logical unit for destroying the cluster.
459

460
  """
461
  _OP_REQP = []
462

    
463
  def CheckPrereq(self):
464
    """Check prerequisites.
465

466
    This checks whether the cluster is empty.
467

468
    Any errors are signalled by raising errors.OpPrereqError.
469

470
    """
471
    master = self.cfg.GetMasterNode()
472

    
473
    nodelist = self.cfg.GetNodeList()
474
    if len(nodelist) != 1 or nodelist[0] != master:
475
      raise errors.OpPrereqError("There are still %d node(s) in"
476
                                 " this cluster." % (len(nodelist) - 1))
477
    instancelist = self.cfg.GetInstanceList()
478
    if instancelist:
479
      raise errors.OpPrereqError("There are still %d instance(s) in"
480
                                 " this cluster." % len(instancelist))
481

    
482
  def Exec(self, feedback_fn):
483
    """Destroys the cluster.
484

485
    """
486
    master = self.cfg.GetMasterNode()
487
    if not rpc.call_node_stop_master(master, False):
488
      raise errors.OpExecError("Could not disable the master role")
489
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490
    utils.CreateBackup(priv_key)
491
    utils.CreateBackup(pub_key)
492
    return master
493

    
494

    
495
class LUVerifyCluster(LogicalUnit):
496
  """Verifies the cluster status.
497

498
  """
499
  HPATH = "cluster-verify"
500
  HTYPE = constants.HTYPE_CLUSTER
501
  _OP_REQP = ["skip_checks"]
502
  REQ_BGL = False
503

    
504
  def ExpandNames(self):
505
    self.needed_locks = {
506
      locking.LEVEL_NODE: locking.ALL_SET,
507
      locking.LEVEL_INSTANCE: locking.ALL_SET,
508
    }
509
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
510

    
511
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512
                  remote_version, feedback_fn):
513
    """Run multiple tests against a node.
514

515
    Test list:
516
      - compares ganeti version
517
      - checks vg existance and size > 20G
518
      - checks config file checksum
519
      - checks ssh to other nodes
520

521
    Args:
522
      node: name of the node to check
523
      file_list: required list of files
524
      local_cksum: dictionary of local files and their checksums
525

526
    """
527
    # compares ganeti version
528
    local_version = constants.PROTOCOL_VERSION
529
    if not remote_version:
530
      feedback_fn("  - ERROR: connection to %s failed" % (node))
531
      return True
532

    
533
    if local_version != remote_version:
534
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
535
                      (local_version, node, remote_version))
536
      return True
537

    
538
    # checks vg existance and size > 20G
539

    
540
    bad = False
541
    if not vglist:
542
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
543
                      (node,))
544
      bad = True
545
    else:
546
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547
                                            constants.MIN_VG_SIZE)
548
      if vgstatus:
549
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
550
        bad = True
551

    
552
    # checks config file checksum
553
    # checks ssh to any
554

    
555
    if 'filelist' not in node_result:
556
      bad = True
557
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
558
    else:
559
      remote_cksum = node_result['filelist']
560
      for file_name in file_list:
561
        if file_name not in remote_cksum:
562
          bad = True
563
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
564
        elif remote_cksum[file_name] != local_cksum[file_name]:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
567

    
568
    if 'nodelist' not in node_result:
569
      bad = True
570
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
571
    else:
572
      if node_result['nodelist']:
573
        bad = True
574
        for node in node_result['nodelist']:
575
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
576
                          (node, node_result['nodelist'][node]))
577
    if 'node-net-test' not in node_result:
578
      bad = True
579
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
580
    else:
581
      if node_result['node-net-test']:
582
        bad = True
583
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
584
        for node in nlist:
585
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
586
                          (node, node_result['node-net-test'][node]))
587

    
588
    hyp_result = node_result.get('hypervisor', None)
589
    if isinstance(hyp_result, dict):
590
      for hv_name, hv_result in hyp_result.iteritems():
591
        if hv_result is not None:
592
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
593
                      (hv_name, hv_result))
594
    return bad
595

    
596
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597
                      node_instance, feedback_fn):
598
    """Verify an instance.
599

600
    This function checks to see if the required block devices are
601
    available on the instance's node.
602

603
    """
604
    bad = False
605

    
606
    node_current = instanceconfig.primary_node
607

    
608
    node_vol_should = {}
609
    instanceconfig.MapLVsByNode(node_vol_should)
610

    
611
    for node in node_vol_should:
612
      for volume in node_vol_should[node]:
613
        if node not in node_vol_is or volume not in node_vol_is[node]:
614
          feedback_fn("  - ERROR: volume %s missing on node %s" %
615
                          (volume, node))
616
          bad = True
617

    
618
    if not instanceconfig.status == 'down':
619
      if (node_current not in node_instance or
620
          not instance in node_instance[node_current]):
621
        feedback_fn("  - ERROR: instance %s not running on node %s" %
622
                        (instance, node_current))
623
        bad = True
624

    
625
    for node in node_instance:
626
      if (not node == node_current):
627
        if instance in node_instance[node]:
628
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
629
                          (instance, node))
630
          bad = True
631

    
632
    return bad
633

    
634
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635
    """Verify if there are any unknown volumes in the cluster.
636

637
    The .os, .swap and backup volumes are ignored. All other volumes are
638
    reported as unknown.
639

640
    """
641
    bad = False
642

    
643
    for node in node_vol_is:
644
      for volume in node_vol_is[node]:
645
        if node not in node_vol_should or volume not in node_vol_should[node]:
646
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647
                      (volume, node))
648
          bad = True
649
    return bad
650

    
651
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652
    """Verify the list of running instances.
653

654
    This checks what instances are running but unknown to the cluster.
655

656
    """
657
    bad = False
658
    for node in node_instance:
659
      for runninginstance in node_instance[node]:
660
        if runninginstance not in instancelist:
661
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662
                          (runninginstance, node))
663
          bad = True
664
    return bad
665

    
666
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667
    """Verify N+1 Memory Resilience.
668

669
    Check that if one single node dies we can still start all the instances it
670
    was primary for.
671

672
    """
673
    bad = False
674

    
675
    for node, nodeinfo in node_info.iteritems():
676
      # This code checks that every node which is now listed as secondary has
677
      # enough memory to host all instances it is supposed to should a single
678
      # other node in the cluster fail.
679
      # FIXME: not ready for failover to an arbitrary node
680
      # FIXME: does not support file-backed instances
681
      # WARNING: we currently take into account down instances as well as up
682
      # ones, considering that even if they're down someone might want to start
683
      # them even in the event of a node failure.
684
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685
        needed_mem = 0
686
        for instance in instances:
687
          needed_mem += instance_cfg[instance].memory
688
        if nodeinfo['mfree'] < needed_mem:
689
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690
                      " failovers should node %s fail" % (node, prinode))
691
          bad = True
692
    return bad
693

    
694
  def CheckPrereq(self):
695
    """Check prerequisites.
696

697
    Transform the list of checks we're going to skip into a set and check that
698
    all its members are valid.
699

700
    """
701
    self.skip_set = frozenset(self.op.skip_checks)
702
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    Cluster-Verify hooks just rone in the post phase and their failure makes
709
    the output be logged in the verify output and the verification to fail.
710

711
    """
712
    all_nodes = self.cfg.GetNodeList()
713
    # TODO: populate the environment with useful information for verify hooks
714
    env = {}
715
    return env, [], all_nodes
716

    
717
  def Exec(self, feedback_fn):
718
    """Verify integrity of cluster, performing various test on nodes.
719

720
    """
721
    bad = False
722
    feedback_fn("* Verifying global settings")
723
    for msg in self.cfg.VerifyConfig():
724
      feedback_fn("  - ERROR: %s" % msg)
725

    
726
    vg_name = self.cfg.GetVGName()
727
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
728
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
729
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
730
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
731
    i_non_redundant = [] # Non redundant instances
732
    node_volume = {}
733
    node_instance = {}
734
    node_info = {}
735
    instance_cfg = {}
736

    
737
    # FIXME: verify OS list
738
    # do local checksums
739
    file_names = []
740
    file_names.append(constants.SSL_CERT_FILE)
741
    file_names.append(constants.CLUSTER_CONF_FILE)
742
    local_checksums = utils.FingerprintFiles(file_names)
743

    
744
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
745
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
746
    all_instanceinfo = rpc.call_instance_list(nodelist, hypervisors)
747
    all_vglist = rpc.call_vg_list(nodelist)
748
    node_verify_param = {
749
      'filelist': file_names,
750
      'nodelist': nodelist,
751
      'hypervisor': hypervisors,
752
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
753
                        for node in nodeinfo]
754
      }
755
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param,
756
                                      self.cfg.GetClusterName())
757
    all_rversion = rpc.call_version(nodelist)
758
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(),
759
                                   self.cfg.GetHypervisorType())
760

    
761
    for node in nodelist:
762
      feedback_fn("* Verifying node %s" % node)
763
      result = self._VerifyNode(node, file_names, local_checksums,
764
                                all_vglist[node], all_nvinfo[node],
765
                                all_rversion[node], feedback_fn)
766
      bad = bad or result
767

    
768
      # node_volume
769
      volumeinfo = all_volumeinfo[node]
770

    
771
      if isinstance(volumeinfo, basestring):
772
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
773
                    (node, volumeinfo[-400:].encode('string_escape')))
774
        bad = True
775
        node_volume[node] = {}
776
      elif not isinstance(volumeinfo, dict):
777
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
778
        bad = True
779
        continue
780
      else:
781
        node_volume[node] = volumeinfo
782

    
783
      # node_instance
784
      nodeinstance = all_instanceinfo[node]
785
      if type(nodeinstance) != list:
786
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
787
        bad = True
788
        continue
789

    
790
      node_instance[node] = nodeinstance
791

    
792
      # node_info
793
      nodeinfo = all_ninfo[node]
794
      if not isinstance(nodeinfo, dict):
795
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
796
        bad = True
797
        continue
798

    
799
      try:
800
        node_info[node] = {
801
          "mfree": int(nodeinfo['memory_free']),
802
          "dfree": int(nodeinfo['vg_free']),
803
          "pinst": [],
804
          "sinst": [],
805
          # dictionary holding all instances this node is secondary for,
806
          # grouped by their primary node. Each key is a cluster node, and each
807
          # value is a list of instances which have the key as primary and the
808
          # current node as secondary.  this is handy to calculate N+1 memory
809
          # availability if you can only failover from a primary to its
810
          # secondary.
811
          "sinst-by-pnode": {},
812
        }
813
      except ValueError:
814
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
815
        bad = True
816
        continue
817

    
818
    node_vol_should = {}
819

    
820
    for instance in instancelist:
821
      feedback_fn("* Verifying instance %s" % instance)
822
      inst_config = self.cfg.GetInstanceInfo(instance)
823
      result =  self._VerifyInstance(instance, inst_config, node_volume,
824
                                     node_instance, feedback_fn)
825
      bad = bad or result
826

    
827
      inst_config.MapLVsByNode(node_vol_should)
828

    
829
      instance_cfg[instance] = inst_config
830

    
831
      pnode = inst_config.primary_node
832
      if pnode in node_info:
833
        node_info[pnode]['pinst'].append(instance)
834
      else:
835
        feedback_fn("  - ERROR: instance %s, connection to primary node"
836
                    " %s failed" % (instance, pnode))
837
        bad = True
838

    
839
      # If the instance is non-redundant we cannot survive losing its primary
840
      # node, so we are not N+1 compliant. On the other hand we have no disk
841
      # templates with more than one secondary so that situation is not well
842
      # supported either.
843
      # FIXME: does not support file-backed instances
844
      if len(inst_config.secondary_nodes) == 0:
845
        i_non_redundant.append(instance)
846
      elif len(inst_config.secondary_nodes) > 1:
847
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
848
                    % instance)
849

    
850
      for snode in inst_config.secondary_nodes:
851
        if snode in node_info:
852
          node_info[snode]['sinst'].append(instance)
853
          if pnode not in node_info[snode]['sinst-by-pnode']:
854
            node_info[snode]['sinst-by-pnode'][pnode] = []
855
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
856
        else:
857
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
858
                      " %s failed" % (instance, snode))
859

    
860
    feedback_fn("* Verifying orphan volumes")
861
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
862
                                       feedback_fn)
863
    bad = bad or result
864

    
865
    feedback_fn("* Verifying remaining instances")
866
    result = self._VerifyOrphanInstances(instancelist, node_instance,
867
                                         feedback_fn)
868
    bad = bad or result
869

    
870
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
871
      feedback_fn("* Verifying N+1 Memory redundancy")
872
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
873
      bad = bad or result
874

    
875
    feedback_fn("* Other Notes")
876
    if i_non_redundant:
877
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
878
                  % len(i_non_redundant))
879

    
880
    return not bad
881

    
882
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
883
    """Analize the post-hooks' result, handle it, and send some
884
    nicely-formatted feedback back to the user.
885

886
    Args:
887
      phase: the hooks phase that has just been run
888
      hooks_results: the results of the multi-node hooks rpc call
889
      feedback_fn: function to send feedback back to the caller
890
      lu_result: previous Exec result
891

892
    """
893
    # We only really run POST phase hooks, and are only interested in
894
    # their results
895
    if phase == constants.HOOKS_PHASE_POST:
896
      # Used to change hooks' output to proper indentation
897
      indent_re = re.compile('^', re.M)
898
      feedback_fn("* Hooks Results")
899
      if not hooks_results:
900
        feedback_fn("  - ERROR: general communication failure")
901
        lu_result = 1
902
      else:
903
        for node_name in hooks_results:
904
          show_node_header = True
905
          res = hooks_results[node_name]
906
          if res is False or not isinstance(res, list):
907
            feedback_fn("    Communication failure")
908
            lu_result = 1
909
            continue
910
          for script, hkr, output in res:
911
            if hkr == constants.HKR_FAIL:
912
              # The node header is only shown once, if there are
913
              # failing hooks on that node
914
              if show_node_header:
915
                feedback_fn("  Node %s:" % node_name)
916
                show_node_header = False
917
              feedback_fn("    ERROR: Script %s failed, output:" % script)
918
              output = indent_re.sub('      ', output)
919
              feedback_fn("%s" % output)
920
              lu_result = 1
921

    
922
      return lu_result
923

    
924

    
925
class LUVerifyDisks(NoHooksLU):
926
  """Verifies the cluster disks status.
927

928
  """
929
  _OP_REQP = []
930
  REQ_BGL = False
931

    
932
  def ExpandNames(self):
933
    self.needed_locks = {
934
      locking.LEVEL_NODE: locking.ALL_SET,
935
      locking.LEVEL_INSTANCE: locking.ALL_SET,
936
    }
937
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
938

    
939
  def CheckPrereq(self):
940
    """Check prerequisites.
941

942
    This has no prerequisites.
943

944
    """
945
    pass
946

    
947
  def Exec(self, feedback_fn):
948
    """Verify integrity of cluster disks.
949

950
    """
951
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
952

    
953
    vg_name = self.cfg.GetVGName()
954
    nodes = utils.NiceSort(self.cfg.GetNodeList())
955
    instances = [self.cfg.GetInstanceInfo(name)
956
                 for name in self.cfg.GetInstanceList()]
957

    
958
    nv_dict = {}
959
    for inst in instances:
960
      inst_lvs = {}
961
      if (inst.status != "up" or
962
          inst.disk_template not in constants.DTS_NET_MIRROR):
963
        continue
964
      inst.MapLVsByNode(inst_lvs)
965
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
966
      for node, vol_list in inst_lvs.iteritems():
967
        for vol in vol_list:
968
          nv_dict[(node, vol)] = inst
969

    
970
    if not nv_dict:
971
      return result
972

    
973
    node_lvs = rpc.call_volume_list(nodes, vg_name)
974

    
975
    to_act = set()
976
    for node in nodes:
977
      # node_volume
978
      lvs = node_lvs[node]
979

    
980
      if isinstance(lvs, basestring):
981
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
982
        res_nlvm[node] = lvs
983
      elif not isinstance(lvs, dict):
984
        logger.Info("connection to node %s failed or invalid data returned" %
985
                    (node,))
986
        res_nodes.append(node)
987
        continue
988

    
989
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
990
        inst = nv_dict.pop((node, lv_name), None)
991
        if (not lv_online and inst is not None
992
            and inst.name not in res_instances):
993
          res_instances.append(inst.name)
994

    
995
    # any leftover items in nv_dict are missing LVs, let's arrange the
996
    # data better
997
    for key, inst in nv_dict.iteritems():
998
      if inst.name not in res_missing:
999
        res_missing[inst.name] = []
1000
      res_missing[inst.name].append(key)
1001

    
1002
    return result
1003

    
1004

    
1005
class LURenameCluster(LogicalUnit):
1006
  """Rename the cluster.
1007

1008
  """
1009
  HPATH = "cluster-rename"
1010
  HTYPE = constants.HTYPE_CLUSTER
1011
  _OP_REQP = ["name"]
1012

    
1013
  def BuildHooksEnv(self):
1014
    """Build hooks env.
1015

1016
    """
1017
    env = {
1018
      "OP_TARGET": self.cfg.GetClusterName(),
1019
      "NEW_NAME": self.op.name,
1020
      }
1021
    mn = self.cfg.GetMasterNode()
1022
    return env, [mn], [mn]
1023

    
1024
  def CheckPrereq(self):
1025
    """Verify that the passed name is a valid one.
1026

1027
    """
1028
    hostname = utils.HostInfo(self.op.name)
1029

    
1030
    new_name = hostname.name
1031
    self.ip = new_ip = hostname.ip
1032
    old_name = self.cfg.GetClusterName()
1033
    old_ip = self.cfg.GetMasterIP()
1034
    if new_name == old_name and new_ip == old_ip:
1035
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1036
                                 " cluster has changed")
1037
    if new_ip != old_ip:
1038
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1039
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1040
                                   " reachable on the network. Aborting." %
1041
                                   new_ip)
1042

    
1043
    self.op.name = new_name
1044

    
1045
  def Exec(self, feedback_fn):
1046
    """Rename the cluster.
1047

1048
    """
1049
    clustername = self.op.name
1050
    ip = self.ip
1051

    
1052
    # shutdown the master IP
1053
    master = self.cfg.GetMasterNode()
1054
    if not rpc.call_node_stop_master(master, False):
1055
      raise errors.OpExecError("Could not disable the master role")
1056

    
1057
    try:
1058
      # modify the sstore
1059
      # TODO: sstore
1060
      ss.SetKey(ss.SS_MASTER_IP, ip)
1061
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1062

    
1063
      # Distribute updated ss config to all nodes
1064
      myself = self.cfg.GetNodeInfo(master)
1065
      dist_nodes = self.cfg.GetNodeList()
1066
      if myself.name in dist_nodes:
1067
        dist_nodes.remove(myself.name)
1068

    
1069
      logger.Debug("Copying updated ssconf data to all nodes")
1070
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1071
        fname = ss.KeyToFilename(keyname)
1072
        result = rpc.call_upload_file(dist_nodes, fname)
1073
        for to_node in dist_nodes:
1074
          if not result[to_node]:
1075
            logger.Error("copy of file %s to node %s failed" %
1076
                         (fname, to_node))
1077
    finally:
1078
      if not rpc.call_node_start_master(master, False):
1079
        logger.Error("Could not re-enable the master role on the master,"
1080
                     " please restart manually.")
1081

    
1082

    
1083
def _RecursiveCheckIfLVMBased(disk):
1084
  """Check if the given disk or its children are lvm-based.
1085

1086
  Args:
1087
    disk: ganeti.objects.Disk object
1088

1089
  Returns:
1090
    boolean indicating whether a LD_LV dev_type was found or not
1091

1092
  """
1093
  if disk.children:
1094
    for chdisk in disk.children:
1095
      if _RecursiveCheckIfLVMBased(chdisk):
1096
        return True
1097
  return disk.dev_type == constants.LD_LV
1098

    
1099

    
1100
class LUSetClusterParams(LogicalUnit):
1101
  """Change the parameters of the cluster.
1102

1103
  """
1104
  HPATH = "cluster-modify"
1105
  HTYPE = constants.HTYPE_CLUSTER
1106
  _OP_REQP = []
1107
  REQ_BGL = False
1108

    
1109
  def ExpandNames(self):
1110
    # FIXME: in the future maybe other cluster params won't require checking on
1111
    # all nodes to be modified.
1112
    self.needed_locks = {
1113
      locking.LEVEL_NODE: locking.ALL_SET,
1114
    }
1115
    self.share_locks[locking.LEVEL_NODE] = 1
1116

    
1117
  def BuildHooksEnv(self):
1118
    """Build hooks env.
1119

1120
    """
1121
    env = {
1122
      "OP_TARGET": self.cfg.GetClusterName(),
1123
      "NEW_VG_NAME": self.op.vg_name,
1124
      }
1125
    mn = self.cfg.GetMasterNode()
1126
    return env, [mn], [mn]
1127

    
1128
  def CheckPrereq(self):
1129
    """Check prerequisites.
1130

1131
    This checks whether the given params don't conflict and
1132
    if the given volume group is valid.
1133

1134
    """
1135
    # FIXME: This only works because there is only one parameter that can be
1136
    # changed or removed.
1137
    if not self.op.vg_name:
1138
      instances = self.cfg.GetAllInstancesInfo().values()
1139
      for inst in instances:
1140
        for disk in inst.disks:
1141
          if _RecursiveCheckIfLVMBased(disk):
1142
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1143
                                       " lvm-based instances exist")
1144

    
1145
    # if vg_name not None, checks given volume group on all nodes
1146
    if self.op.vg_name:
1147
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1148
      vglist = rpc.call_vg_list(node_list)
1149
      for node in node_list:
1150
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1151
                                              constants.MIN_VG_SIZE)
1152
        if vgstatus:
1153
          raise errors.OpPrereqError("Error on node '%s': %s" %
1154
                                     (node, vgstatus))
1155

    
1156
  def Exec(self, feedback_fn):
1157
    """Change the parameters of the cluster.
1158

1159
    """
1160
    if self.op.vg_name != self.cfg.GetVGName():
1161
      self.cfg.SetVGName(self.op.vg_name)
1162
    else:
1163
      feedback_fn("Cluster LVM configuration already in desired"
1164
                  " state, not changing")
1165

    
1166

    
1167
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1168
  """Sleep and poll for an instance's disk to sync.
1169

1170
  """
1171
  if not instance.disks:
1172
    return True
1173

    
1174
  if not oneshot:
1175
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1176

    
1177
  node = instance.primary_node
1178

    
1179
  for dev in instance.disks:
1180
    cfgw.SetDiskID(dev, node)
1181

    
1182
  retries = 0
1183
  while True:
1184
    max_time = 0
1185
    done = True
1186
    cumul_degraded = False
1187
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1188
    if not rstats:
1189
      proc.LogWarning("Can't get any data from node %s" % node)
1190
      retries += 1
1191
      if retries >= 10:
1192
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1193
                                 " aborting." % node)
1194
      time.sleep(6)
1195
      continue
1196
    retries = 0
1197
    for i in range(len(rstats)):
1198
      mstat = rstats[i]
1199
      if mstat is None:
1200
        proc.LogWarning("Can't compute data for node %s/%s" %
1201
                        (node, instance.disks[i].iv_name))
1202
        continue
1203
      # we ignore the ldisk parameter
1204
      perc_done, est_time, is_degraded, _ = mstat
1205
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1206
      if perc_done is not None:
1207
        done = False
1208
        if est_time is not None:
1209
          rem_time = "%d estimated seconds remaining" % est_time
1210
          max_time = est_time
1211
        else:
1212
          rem_time = "no time estimate"
1213
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1214
                     (instance.disks[i].iv_name, perc_done, rem_time))
1215
    if done or oneshot:
1216
      break
1217

    
1218
    time.sleep(min(60, max_time))
1219

    
1220
  if done:
1221
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1222
  return not cumul_degraded
1223

    
1224

    
1225
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1226
  """Check that mirrors are not degraded.
1227

1228
  The ldisk parameter, if True, will change the test from the
1229
  is_degraded attribute (which represents overall non-ok status for
1230
  the device(s)) to the ldisk (representing the local storage status).
1231

1232
  """
1233
  cfgw.SetDiskID(dev, node)
1234
  if ldisk:
1235
    idx = 6
1236
  else:
1237
    idx = 5
1238

    
1239
  result = True
1240
  if on_primary or dev.AssembleOnSecondary():
1241
    rstats = rpc.call_blockdev_find(node, dev)
1242
    if not rstats:
1243
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1244
      result = False
1245
    else:
1246
      result = result and (not rstats[idx])
1247
  if dev.children:
1248
    for child in dev.children:
1249
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1250

    
1251
  return result
1252

    
1253

    
1254
class LUDiagnoseOS(NoHooksLU):
1255
  """Logical unit for OS diagnose/query.
1256

1257
  """
1258
  _OP_REQP = ["output_fields", "names"]
1259
  REQ_BGL = False
1260

    
1261
  def ExpandNames(self):
1262
    if self.op.names:
1263
      raise errors.OpPrereqError("Selective OS query not supported")
1264

    
1265
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1266
    _CheckOutputFields(static=[],
1267
                       dynamic=self.dynamic_fields,
1268
                       selected=self.op.output_fields)
1269

    
1270
    # Lock all nodes, in shared mode
1271
    self.needed_locks = {}
1272
    self.share_locks[locking.LEVEL_NODE] = 1
1273
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1274

    
1275
  def CheckPrereq(self):
1276
    """Check prerequisites.
1277

1278
    """
1279

    
1280
  @staticmethod
1281
  def _DiagnoseByOS(node_list, rlist):
1282
    """Remaps a per-node return list into an a per-os per-node dictionary
1283

1284
      Args:
1285
        node_list: a list with the names of all nodes
1286
        rlist: a map with node names as keys and OS objects as values
1287

1288
      Returns:
1289
        map: a map with osnames as keys and as value another map, with
1290
             nodes as
1291
             keys and list of OS objects as values
1292
             e.g. {"debian-etch": {"node1": [<object>,...],
1293
                                   "node2": [<object>,]}
1294
                  }
1295

1296
    """
1297
    all_os = {}
1298
    for node_name, nr in rlist.iteritems():
1299
      if not nr:
1300
        continue
1301
      for os_obj in nr:
1302
        if os_obj.name not in all_os:
1303
          # build a list of nodes for this os containing empty lists
1304
          # for each node in node_list
1305
          all_os[os_obj.name] = {}
1306
          for nname in node_list:
1307
            all_os[os_obj.name][nname] = []
1308
        all_os[os_obj.name][node_name].append(os_obj)
1309
    return all_os
1310

    
1311
  def Exec(self, feedback_fn):
1312
    """Compute the list of OSes.
1313

1314
    """
1315
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1316
    node_data = rpc.call_os_diagnose(node_list)
1317
    if node_data == False:
1318
      raise errors.OpExecError("Can't gather the list of OSes")
1319
    pol = self._DiagnoseByOS(node_list, node_data)
1320
    output = []
1321
    for os_name, os_data in pol.iteritems():
1322
      row = []
1323
      for field in self.op.output_fields:
1324
        if field == "name":
1325
          val = os_name
1326
        elif field == "valid":
1327
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1328
        elif field == "node_status":
1329
          val = {}
1330
          for node_name, nos_list in os_data.iteritems():
1331
            val[node_name] = [(v.status, v.path) for v in nos_list]
1332
        else:
1333
          raise errors.ParameterError(field)
1334
        row.append(val)
1335
      output.append(row)
1336

    
1337
    return output
1338

    
1339

    
1340
class LURemoveNode(LogicalUnit):
1341
  """Logical unit for removing a node.
1342

1343
  """
1344
  HPATH = "node-remove"
1345
  HTYPE = constants.HTYPE_NODE
1346
  _OP_REQP = ["node_name"]
1347

    
1348
  def BuildHooksEnv(self):
1349
    """Build hooks env.
1350

1351
    This doesn't run on the target node in the pre phase as a failed
1352
    node would then be impossible to remove.
1353

1354
    """
1355
    env = {
1356
      "OP_TARGET": self.op.node_name,
1357
      "NODE_NAME": self.op.node_name,
1358
      }
1359
    all_nodes = self.cfg.GetNodeList()
1360
    all_nodes.remove(self.op.node_name)
1361
    return env, all_nodes, all_nodes
1362

    
1363
  def CheckPrereq(self):
1364
    """Check prerequisites.
1365

1366
    This checks:
1367
     - the node exists in the configuration
1368
     - it does not have primary or secondary instances
1369
     - it's not the master
1370

1371
    Any errors are signalled by raising errors.OpPrereqError.
1372

1373
    """
1374
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1375
    if node is None:
1376
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1377

    
1378
    instance_list = self.cfg.GetInstanceList()
1379

    
1380
    masternode = self.cfg.GetMasterNode()
1381
    if node.name == masternode:
1382
      raise errors.OpPrereqError("Node is the master node,"
1383
                                 " you need to failover first.")
1384

    
1385
    for instance_name in instance_list:
1386
      instance = self.cfg.GetInstanceInfo(instance_name)
1387
      if node.name == instance.primary_node:
1388
        raise errors.OpPrereqError("Instance %s still running on the node,"
1389
                                   " please remove first." % instance_name)
1390
      if node.name in instance.secondary_nodes:
1391
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1392
                                   " please remove first." % instance_name)
1393
    self.op.node_name = node.name
1394
    self.node = node
1395

    
1396
  def Exec(self, feedback_fn):
1397
    """Removes the node from the cluster.
1398

1399
    """
1400
    node = self.node
1401
    logger.Info("stopping the node daemon and removing configs from node %s" %
1402
                node.name)
1403

    
1404
    self.context.RemoveNode(node.name)
1405

    
1406
    rpc.call_node_leave_cluster(node.name)
1407

    
1408

    
1409
class LUQueryNodes(NoHooksLU):
1410
  """Logical unit for querying nodes.
1411

1412
  """
1413
  _OP_REQP = ["output_fields", "names"]
1414
  REQ_BGL = False
1415

    
1416
  def ExpandNames(self):
1417
    self.dynamic_fields = frozenset([
1418
      "dtotal", "dfree",
1419
      "mtotal", "mnode", "mfree",
1420
      "bootid",
1421
      "ctotal",
1422
      ])
1423

    
1424
    self.static_fields = frozenset([
1425
      "name", "pinst_cnt", "sinst_cnt",
1426
      "pinst_list", "sinst_list",
1427
      "pip", "sip", "tags",
1428
      "serial_no",
1429
      ])
1430

    
1431
    _CheckOutputFields(static=self.static_fields,
1432
                       dynamic=self.dynamic_fields,
1433
                       selected=self.op.output_fields)
1434

    
1435
    self.needed_locks = {}
1436
    self.share_locks[locking.LEVEL_NODE] = 1
1437

    
1438
    if self.op.names:
1439
      self.wanted = _GetWantedNodes(self, self.op.names)
1440
    else:
1441
      self.wanted = locking.ALL_SET
1442

    
1443
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1444
    if self.do_locking:
1445
      # if we don't request only static fields, we need to lock the nodes
1446
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1447

    
1448

    
1449
  def CheckPrereq(self):
1450
    """Check prerequisites.
1451

1452
    """
1453
    # The validation of the node list is done in the _GetWantedNodes,
1454
    # if non empty, and if empty, there's no validation to do
1455
    pass
1456

    
1457
  def Exec(self, feedback_fn):
1458
    """Computes the list of nodes and their attributes.
1459

1460
    """
1461
    all_info = self.cfg.GetAllNodesInfo()
1462
    if self.do_locking:
1463
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1464
    elif self.wanted != locking.ALL_SET:
1465
      nodenames = self.wanted
1466
      missing = set(nodenames).difference(all_info.keys())
1467
      if missing:
1468
        raise self.OpExecError(
1469
          "Some nodes were removed before retrieving their data: %s" % missing)
1470
    else:
1471
      nodenames = all_info.keys()
1472
    nodelist = [all_info[name] for name in nodenames]
1473

    
1474
    # begin data gathering
1475

    
1476
    if self.dynamic_fields.intersection(self.op.output_fields):
1477
      live_data = {}
1478
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1479
                                     self.cfg.GetHypervisorType())
1480
      for name in nodenames:
1481
        nodeinfo = node_data.get(name, None)
1482
        if nodeinfo:
1483
          live_data[name] = {
1484
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1485
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1486
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1487
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1488
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1489
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1490
            "bootid": nodeinfo['bootid'],
1491
            }
1492
        else:
1493
          live_data[name] = {}
1494
    else:
1495
      live_data = dict.fromkeys(nodenames, {})
1496

    
1497
    node_to_primary = dict([(name, set()) for name in nodenames])
1498
    node_to_secondary = dict([(name, set()) for name in nodenames])
1499

    
1500
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1501
                             "sinst_cnt", "sinst_list"))
1502
    if inst_fields & frozenset(self.op.output_fields):
1503
      instancelist = self.cfg.GetInstanceList()
1504

    
1505
      for instance_name in instancelist:
1506
        inst = self.cfg.GetInstanceInfo(instance_name)
1507
        if inst.primary_node in node_to_primary:
1508
          node_to_primary[inst.primary_node].add(inst.name)
1509
        for secnode in inst.secondary_nodes:
1510
          if secnode in node_to_secondary:
1511
            node_to_secondary[secnode].add(inst.name)
1512

    
1513
    # end data gathering
1514

    
1515
    output = []
1516
    for node in nodelist:
1517
      node_output = []
1518
      for field in self.op.output_fields:
1519
        if field == "name":
1520
          val = node.name
1521
        elif field == "pinst_list":
1522
          val = list(node_to_primary[node.name])
1523
        elif field == "sinst_list":
1524
          val = list(node_to_secondary[node.name])
1525
        elif field == "pinst_cnt":
1526
          val = len(node_to_primary[node.name])
1527
        elif field == "sinst_cnt":
1528
          val = len(node_to_secondary[node.name])
1529
        elif field == "pip":
1530
          val = node.primary_ip
1531
        elif field == "sip":
1532
          val = node.secondary_ip
1533
        elif field == "tags":
1534
          val = list(node.GetTags())
1535
        elif field == "serial_no":
1536
          val = node.serial_no
1537
        elif field in self.dynamic_fields:
1538
          val = live_data[node.name].get(field, None)
1539
        else:
1540
          raise errors.ParameterError(field)
1541
        node_output.append(val)
1542
      output.append(node_output)
1543

    
1544
    return output
1545

    
1546

    
1547
class LUQueryNodeVolumes(NoHooksLU):
1548
  """Logical unit for getting volumes on node(s).
1549

1550
  """
1551
  _OP_REQP = ["nodes", "output_fields"]
1552
  REQ_BGL = False
1553

    
1554
  def ExpandNames(self):
1555
    _CheckOutputFields(static=["node"],
1556
                       dynamic=["phys", "vg", "name", "size", "instance"],
1557
                       selected=self.op.output_fields)
1558

    
1559
    self.needed_locks = {}
1560
    self.share_locks[locking.LEVEL_NODE] = 1
1561
    if not self.op.nodes:
1562
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1563
    else:
1564
      self.needed_locks[locking.LEVEL_NODE] = \
1565
        _GetWantedNodes(self, self.op.nodes)
1566

    
1567
  def CheckPrereq(self):
1568
    """Check prerequisites.
1569

1570
    This checks that the fields required are valid output fields.
1571

1572
    """
1573
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1574

    
1575
  def Exec(self, feedback_fn):
1576
    """Computes the list of nodes and their attributes.
1577

1578
    """
1579
    nodenames = self.nodes
1580
    volumes = rpc.call_node_volumes(nodenames)
1581

    
1582
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1583
             in self.cfg.GetInstanceList()]
1584

    
1585
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1586

    
1587
    output = []
1588
    for node in nodenames:
1589
      if node not in volumes or not volumes[node]:
1590
        continue
1591

    
1592
      node_vols = volumes[node][:]
1593
      node_vols.sort(key=lambda vol: vol['dev'])
1594

    
1595
      for vol in node_vols:
1596
        node_output = []
1597
        for field in self.op.output_fields:
1598
          if field == "node":
1599
            val = node
1600
          elif field == "phys":
1601
            val = vol['dev']
1602
          elif field == "vg":
1603
            val = vol['vg']
1604
          elif field == "name":
1605
            val = vol['name']
1606
          elif field == "size":
1607
            val = int(float(vol['size']))
1608
          elif field == "instance":
1609
            for inst in ilist:
1610
              if node not in lv_by_node[inst]:
1611
                continue
1612
              if vol['name'] in lv_by_node[inst][node]:
1613
                val = inst.name
1614
                break
1615
            else:
1616
              val = '-'
1617
          else:
1618
            raise errors.ParameterError(field)
1619
          node_output.append(str(val))
1620

    
1621
        output.append(node_output)
1622

    
1623
    return output
1624

    
1625

    
1626
class LUAddNode(LogicalUnit):
1627
  """Logical unit for adding node to the cluster.
1628

1629
  """
1630
  HPATH = "node-add"
1631
  HTYPE = constants.HTYPE_NODE
1632
  _OP_REQP = ["node_name"]
1633

    
1634
  def BuildHooksEnv(self):
1635
    """Build hooks env.
1636

1637
    This will run on all nodes before, and on all nodes + the new node after.
1638

1639
    """
1640
    env = {
1641
      "OP_TARGET": self.op.node_name,
1642
      "NODE_NAME": self.op.node_name,
1643
      "NODE_PIP": self.op.primary_ip,
1644
      "NODE_SIP": self.op.secondary_ip,
1645
      }
1646
    nodes_0 = self.cfg.GetNodeList()
1647
    nodes_1 = nodes_0 + [self.op.node_name, ]
1648
    return env, nodes_0, nodes_1
1649

    
1650
  def CheckPrereq(self):
1651
    """Check prerequisites.
1652

1653
    This checks:
1654
     - the new node is not already in the config
1655
     - it is resolvable
1656
     - its parameters (single/dual homed) matches the cluster
1657

1658
    Any errors are signalled by raising errors.OpPrereqError.
1659

1660
    """
1661
    node_name = self.op.node_name
1662
    cfg = self.cfg
1663

    
1664
    dns_data = utils.HostInfo(node_name)
1665

    
1666
    node = dns_data.name
1667
    primary_ip = self.op.primary_ip = dns_data.ip
1668
    secondary_ip = getattr(self.op, "secondary_ip", None)
1669
    if secondary_ip is None:
1670
      secondary_ip = primary_ip
1671
    if not utils.IsValidIP(secondary_ip):
1672
      raise errors.OpPrereqError("Invalid secondary IP given")
1673
    self.op.secondary_ip = secondary_ip
1674

    
1675
    node_list = cfg.GetNodeList()
1676
    if not self.op.readd and node in node_list:
1677
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1678
                                 node)
1679
    elif self.op.readd and node not in node_list:
1680
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1681

    
1682
    for existing_node_name in node_list:
1683
      existing_node = cfg.GetNodeInfo(existing_node_name)
1684

    
1685
      if self.op.readd and node == existing_node_name:
1686
        if (existing_node.primary_ip != primary_ip or
1687
            existing_node.secondary_ip != secondary_ip):
1688
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1689
                                     " address configuration as before")
1690
        continue
1691

    
1692
      if (existing_node.primary_ip == primary_ip or
1693
          existing_node.secondary_ip == primary_ip or
1694
          existing_node.primary_ip == secondary_ip or
1695
          existing_node.secondary_ip == secondary_ip):
1696
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1697
                                   " existing node %s" % existing_node.name)
1698

    
1699
    # check that the type of the node (single versus dual homed) is the
1700
    # same as for the master
1701
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1702
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1703
    newbie_singlehomed = secondary_ip == primary_ip
1704
    if master_singlehomed != newbie_singlehomed:
1705
      if master_singlehomed:
1706
        raise errors.OpPrereqError("The master has no private ip but the"
1707
                                   " new node has one")
1708
      else:
1709
        raise errors.OpPrereqError("The master has a private ip but the"
1710
                                   " new node doesn't have one")
1711

    
1712
    # checks reachablity
1713
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1714
      raise errors.OpPrereqError("Node not reachable by ping")
1715

    
1716
    if not newbie_singlehomed:
1717
      # check reachability from my secondary ip to newbie's secondary ip
1718
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1719
                           source=myself.secondary_ip):
1720
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1721
                                   " based ping to noded port")
1722

    
1723
    self.new_node = objects.Node(name=node,
1724
                                 primary_ip=primary_ip,
1725
                                 secondary_ip=secondary_ip)
1726

    
1727
  def Exec(self, feedback_fn):
1728
    """Adds the new node to the cluster.
1729

1730
    """
1731
    new_node = self.new_node
1732
    node = new_node.name
1733

    
1734
    # check connectivity
1735
    result = rpc.call_version([node])[node]
1736
    if result:
1737
      if constants.PROTOCOL_VERSION == result:
1738
        logger.Info("communication to node %s fine, sw version %s match" %
1739
                    (node, result))
1740
      else:
1741
        raise errors.OpExecError("Version mismatch master version %s,"
1742
                                 " node version %s" %
1743
                                 (constants.PROTOCOL_VERSION, result))
1744
    else:
1745
      raise errors.OpExecError("Cannot get version from the new node")
1746

    
1747
    # setup ssh on node
1748
    logger.Info("copy ssh key to node %s" % node)
1749
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1750
    keyarray = []
1751
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1752
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1753
                priv_key, pub_key]
1754

    
1755
    for i in keyfiles:
1756
      f = open(i, 'r')
1757
      try:
1758
        keyarray.append(f.read())
1759
      finally:
1760
        f.close()
1761

    
1762
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1763
                               keyarray[3], keyarray[4], keyarray[5])
1764

    
1765
    if not result:
1766
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1767

    
1768
    # Add node to our /etc/hosts, and add key to known_hosts
1769
    utils.AddHostToEtcHosts(new_node.name)
1770

    
1771
    if new_node.secondary_ip != new_node.primary_ip:
1772
      if not rpc.call_node_tcp_ping(new_node.name,
1773
                                    constants.LOCALHOST_IP_ADDRESS,
1774
                                    new_node.secondary_ip,
1775
                                    constants.DEFAULT_NODED_PORT,
1776
                                    10, False):
1777
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1778
                                 " you gave (%s). Please fix and re-run this"
1779
                                 " command." % new_node.secondary_ip)
1780

    
1781
    node_verify_list = [self.cfg.GetMasterNode()]
1782
    node_verify_param = {
1783
      'nodelist': [node],
1784
      # TODO: do a node-net-test as well?
1785
    }
1786

    
1787
    result = rpc.call_node_verify(node_verify_list, node_verify_param,
1788
                                  self.cfg.GetClusterName())
1789
    for verifier in node_verify_list:
1790
      if not result[verifier]:
1791
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1792
                                 " for remote verification" % verifier)
1793
      if result[verifier]['nodelist']:
1794
        for failed in result[verifier]['nodelist']:
1795
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1796
                      (verifier, result[verifier]['nodelist'][failed]))
1797
        raise errors.OpExecError("ssh/hostname verification failed.")
1798

    
1799
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1800
    # including the node just added
1801
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1802
    dist_nodes = self.cfg.GetNodeList()
1803
    if not self.op.readd:
1804
      dist_nodes.append(node)
1805
    if myself.name in dist_nodes:
1806
      dist_nodes.remove(myself.name)
1807

    
1808
    logger.Debug("Copying hosts and known_hosts to all nodes")
1809
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1810
      result = rpc.call_upload_file(dist_nodes, fname)
1811
      for to_node in dist_nodes:
1812
        if not result[to_node]:
1813
          logger.Error("copy of file %s to node %s failed" %
1814
                       (fname, to_node))
1815

    
1816
    to_copy = []
1817
    if constants.HT_XEN_HVM31 in self.cfg.GetClusterInfo().enabled_hypervisors:
1818
      to_copy.append(constants.VNC_PASSWORD_FILE)
1819
    for fname in to_copy:
1820
      result = rpc.call_upload_file([node], fname)
1821
      if not result[node]:
1822
        logger.Error("could not copy file %s to node %s" % (fname, node))
1823

    
1824
    if self.op.readd:
1825
      self.context.ReaddNode(new_node)
1826
    else:
1827
      self.context.AddNode(new_node)
1828

    
1829

    
1830
class LUQueryClusterInfo(NoHooksLU):
1831
  """Query cluster configuration.
1832

1833
  """
1834
  _OP_REQP = []
1835
  REQ_MASTER = False
1836
  REQ_BGL = False
1837

    
1838
  def ExpandNames(self):
1839
    self.needed_locks = {}
1840

    
1841
  def CheckPrereq(self):
1842
    """No prerequsites needed for this LU.
1843

1844
    """
1845
    pass
1846

    
1847
  def Exec(self, feedback_fn):
1848
    """Return cluster config.
1849

1850
    """
1851
    result = {
1852
      "name": self.cfg.GetClusterName(),
1853
      "software_version": constants.RELEASE_VERSION,
1854
      "protocol_version": constants.PROTOCOL_VERSION,
1855
      "config_version": constants.CONFIG_VERSION,
1856
      "os_api_version": constants.OS_API_VERSION,
1857
      "export_version": constants.EXPORT_VERSION,
1858
      "master": self.cfg.GetMasterNode(),
1859
      "architecture": (platform.architecture()[0], platform.machine()),
1860
      "hypervisor_type": self.cfg.GetHypervisorType(),
1861
      "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1862
      }
1863

    
1864
    return result
1865

    
1866

    
1867
class LUQueryConfigValues(NoHooksLU):
1868
  """Return configuration values.
1869

1870
  """
1871
  _OP_REQP = []
1872
  REQ_BGL = False
1873

    
1874
  def ExpandNames(self):
1875
    self.needed_locks = {}
1876

    
1877
    static_fields = ["cluster_name", "master_node"]
1878
    _CheckOutputFields(static=static_fields,
1879
                       dynamic=[],
1880
                       selected=self.op.output_fields)
1881

    
1882
  def CheckPrereq(self):
1883
    """No prerequisites.
1884

1885
    """
1886
    pass
1887

    
1888
  def Exec(self, feedback_fn):
1889
    """Dump a representation of the cluster config to the standard output.
1890

1891
    """
1892
    values = []
1893
    for field in self.op.output_fields:
1894
      if field == "cluster_name":
1895
        values.append(self.cfg.GetClusterName())
1896
      elif field == "master_node":
1897
        values.append(self.cfg.GetMasterNode())
1898
      else:
1899
        raise errors.ParameterError(field)
1900
    return values
1901

    
1902

    
1903
class LUActivateInstanceDisks(NoHooksLU):
1904
  """Bring up an instance's disks.
1905

1906
  """
1907
  _OP_REQP = ["instance_name"]
1908
  REQ_BGL = False
1909

    
1910
  def ExpandNames(self):
1911
    self._ExpandAndLockInstance()
1912
    self.needed_locks[locking.LEVEL_NODE] = []
1913
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1914

    
1915
  def DeclareLocks(self, level):
1916
    if level == locking.LEVEL_NODE:
1917
      self._LockInstancesNodes()
1918

    
1919
  def CheckPrereq(self):
1920
    """Check prerequisites.
1921

1922
    This checks that the instance is in the cluster.
1923

1924
    """
1925
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1926
    assert self.instance is not None, \
1927
      "Cannot retrieve locked instance %s" % self.op.instance_name
1928

    
1929
  def Exec(self, feedback_fn):
1930
    """Activate the disks.
1931

1932
    """
1933
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1934
    if not disks_ok:
1935
      raise errors.OpExecError("Cannot activate block devices")
1936

    
1937
    return disks_info
1938

    
1939

    
1940
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1941
  """Prepare the block devices for an instance.
1942

1943
  This sets up the block devices on all nodes.
1944

1945
  Args:
1946
    instance: a ganeti.objects.Instance object
1947
    ignore_secondaries: if true, errors on secondary nodes won't result
1948
                        in an error return from the function
1949

1950
  Returns:
1951
    false if the operation failed
1952
    list of (host, instance_visible_name, node_visible_name) if the operation
1953
         suceeded with the mapping from node devices to instance devices
1954
  """
1955
  device_info = []
1956
  disks_ok = True
1957
  iname = instance.name
1958
  # With the two passes mechanism we try to reduce the window of
1959
  # opportunity for the race condition of switching DRBD to primary
1960
  # before handshaking occured, but we do not eliminate it
1961

    
1962
  # The proper fix would be to wait (with some limits) until the
1963
  # connection has been made and drbd transitions from WFConnection
1964
  # into any other network-connected state (Connected, SyncTarget,
1965
  # SyncSource, etc.)
1966

    
1967
  # 1st pass, assemble on all nodes in secondary mode
1968
  for inst_disk in instance.disks:
1969
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1970
      cfg.SetDiskID(node_disk, node)
1971
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1972
      if not result:
1973
        logger.Error("could not prepare block device %s on node %s"
1974
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1975
        if not ignore_secondaries:
1976
          disks_ok = False
1977

    
1978
  # FIXME: race condition on drbd migration to primary
1979

    
1980
  # 2nd pass, do only the primary node
1981
  for inst_disk in instance.disks:
1982
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1983
      if node != instance.primary_node:
1984
        continue
1985
      cfg.SetDiskID(node_disk, node)
1986
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1987
      if not result:
1988
        logger.Error("could not prepare block device %s on node %s"
1989
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1990
        disks_ok = False
1991
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1992

    
1993
  # leave the disks configured for the primary node
1994
  # this is a workaround that would be fixed better by
1995
  # improving the logical/physical id handling
1996
  for disk in instance.disks:
1997
    cfg.SetDiskID(disk, instance.primary_node)
1998

    
1999
  return disks_ok, device_info
2000

    
2001

    
2002
def _StartInstanceDisks(cfg, instance, force):
2003
  """Start the disks of an instance.
2004

2005
  """
2006
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2007
                                           ignore_secondaries=force)
2008
  if not disks_ok:
2009
    _ShutdownInstanceDisks(instance, cfg)
2010
    if force is not None and not force:
2011
      logger.Error("If the message above refers to a secondary node,"
2012
                   " you can retry the operation using '--force'.")
2013
    raise errors.OpExecError("Disk consistency error")
2014

    
2015

    
2016
class LUDeactivateInstanceDisks(NoHooksLU):
2017
  """Shutdown an instance's disks.
2018

2019
  """
2020
  _OP_REQP = ["instance_name"]
2021
  REQ_BGL = False
2022

    
2023
  def ExpandNames(self):
2024
    self._ExpandAndLockInstance()
2025
    self.needed_locks[locking.LEVEL_NODE] = []
2026
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2027

    
2028
  def DeclareLocks(self, level):
2029
    if level == locking.LEVEL_NODE:
2030
      self._LockInstancesNodes()
2031

    
2032
  def CheckPrereq(self):
2033
    """Check prerequisites.
2034

2035
    This checks that the instance is in the cluster.
2036

2037
    """
2038
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2039
    assert self.instance is not None, \
2040
      "Cannot retrieve locked instance %s" % self.op.instance_name
2041

    
2042
  def Exec(self, feedback_fn):
2043
    """Deactivate the disks
2044

2045
    """
2046
    instance = self.instance
2047
    _SafeShutdownInstanceDisks(instance, self.cfg)
2048

    
2049

    
2050
def _SafeShutdownInstanceDisks(instance, cfg):
2051
  """Shutdown block devices of an instance.
2052

2053
  This function checks if an instance is running, before calling
2054
  _ShutdownInstanceDisks.
2055

2056
  """
2057
  ins_l = rpc.call_instance_list([instance.primary_node],
2058
                                 [instance.hypervisor])
2059
  ins_l = ins_l[instance.primary_node]
2060
  if not type(ins_l) is list:
2061
    raise errors.OpExecError("Can't contact node '%s'" %
2062
                             instance.primary_node)
2063

    
2064
  if instance.name in ins_l:
2065
    raise errors.OpExecError("Instance is running, can't shutdown"
2066
                             " block devices.")
2067

    
2068
  _ShutdownInstanceDisks(instance, cfg)
2069

    
2070

    
2071
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2072
  """Shutdown block devices of an instance.
2073

2074
  This does the shutdown on all nodes of the instance.
2075

2076
  If the ignore_primary is false, errors on the primary node are
2077
  ignored.
2078

2079
  """
2080
  result = True
2081
  for disk in instance.disks:
2082
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2083
      cfg.SetDiskID(top_disk, node)
2084
      if not rpc.call_blockdev_shutdown(node, top_disk):
2085
        logger.Error("could not shutdown block device %s on node %s" %
2086
                     (disk.iv_name, node))
2087
        if not ignore_primary or node != instance.primary_node:
2088
          result = False
2089
  return result
2090

    
2091

    
2092
def _CheckNodeFreeMemory(cfg, node, reason, requested, hypervisor):
2093
  """Checks if a node has enough free memory.
2094

2095
  This function check if a given node has the needed amount of free
2096
  memory. In case the node has less memory or we cannot get the
2097
  information from the node, this function raise an OpPrereqError
2098
  exception.
2099

2100
  @type cfg: C{config.ConfigWriter}
2101
  @param cfg: the ConfigWriter instance from which we get configuration data
2102
  @type node: C{str}
2103
  @param node: the node to check
2104
  @type reason: C{str}
2105
  @param reason: string to use in the error message
2106
  @type requested: C{int}
2107
  @param requested: the amount of memory in MiB to check for
2108
  @type hypervisor: C{str}
2109
  @param hypervisor: the hypervisor to ask for memory stats
2110
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2111
      we cannot check the node
2112

2113
  """
2114
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName(), hypervisor)
2115
  if not nodeinfo or not isinstance(nodeinfo, dict):
2116
    raise errors.OpPrereqError("Could not contact node %s for resource"
2117
                             " information" % (node,))
2118

    
2119
  free_mem = nodeinfo[node].get('memory_free')
2120
  if not isinstance(free_mem, int):
2121
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2122
                             " was '%s'" % (node, free_mem))
2123
  if requested > free_mem:
2124
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2125
                             " needed %s MiB, available %s MiB" %
2126
                             (node, reason, requested, free_mem))
2127

    
2128

    
2129
class LUStartupInstance(LogicalUnit):
2130
  """Starts an instance.
2131

2132
  """
2133
  HPATH = "instance-start"
2134
  HTYPE = constants.HTYPE_INSTANCE
2135
  _OP_REQP = ["instance_name", "force"]
2136
  REQ_BGL = False
2137

    
2138
  def ExpandNames(self):
2139
    self._ExpandAndLockInstance()
2140
    self.needed_locks[locking.LEVEL_NODE] = []
2141
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2142

    
2143
  def DeclareLocks(self, level):
2144
    if level == locking.LEVEL_NODE:
2145
      self._LockInstancesNodes()
2146

    
2147
  def BuildHooksEnv(self):
2148
    """Build hooks env.
2149

2150
    This runs on master, primary and secondary nodes of the instance.
2151

2152
    """
2153
    env = {
2154
      "FORCE": self.op.force,
2155
      }
2156
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2157
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2158
          list(self.instance.secondary_nodes))
2159
    return env, nl, nl
2160

    
2161
  def CheckPrereq(self):
2162
    """Check prerequisites.
2163

2164
    This checks that the instance is in the cluster.
2165

2166
    """
2167
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2168
    assert self.instance is not None, \
2169
      "Cannot retrieve locked instance %s" % self.op.instance_name
2170

    
2171
    # check bridges existance
2172
    _CheckInstanceBridgesExist(instance)
2173

    
2174
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2175
                         "starting instance %s" % instance.name,
2176
                         instance.memory, instance.hypervisor)
2177

    
2178
  def Exec(self, feedback_fn):
2179
    """Start the instance.
2180

2181
    """
2182
    instance = self.instance
2183
    force = self.op.force
2184
    extra_args = getattr(self.op, "extra_args", "")
2185

    
2186
    self.cfg.MarkInstanceUp(instance.name)
2187

    
2188
    node_current = instance.primary_node
2189

    
2190
    _StartInstanceDisks(self.cfg, instance, force)
2191

    
2192
    if not rpc.call_instance_start(node_current, instance, extra_args):
2193
      _ShutdownInstanceDisks(instance, self.cfg)
2194
      raise errors.OpExecError("Could not start instance")
2195

    
2196

    
2197
class LURebootInstance(LogicalUnit):
2198
  """Reboot an instance.
2199

2200
  """
2201
  HPATH = "instance-reboot"
2202
  HTYPE = constants.HTYPE_INSTANCE
2203
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2204
  REQ_BGL = False
2205

    
2206
  def ExpandNames(self):
2207
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2208
                                   constants.INSTANCE_REBOOT_HARD,
2209
                                   constants.INSTANCE_REBOOT_FULL]:
2210
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2211
                                  (constants.INSTANCE_REBOOT_SOFT,
2212
                                   constants.INSTANCE_REBOOT_HARD,
2213
                                   constants.INSTANCE_REBOOT_FULL))
2214
    self._ExpandAndLockInstance()
2215
    self.needed_locks[locking.LEVEL_NODE] = []
2216
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2217

    
2218
  def DeclareLocks(self, level):
2219
    if level == locking.LEVEL_NODE:
2220
      primary_only = not constants.INSTANCE_REBOOT_FULL
2221
      self._LockInstancesNodes(primary_only=primary_only)
2222

    
2223
  def BuildHooksEnv(self):
2224
    """Build hooks env.
2225

2226
    This runs on master, primary and secondary nodes of the instance.
2227

2228
    """
2229
    env = {
2230
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2231
      }
2232
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2233
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2234
          list(self.instance.secondary_nodes))
2235
    return env, nl, nl
2236

    
2237
  def CheckPrereq(self):
2238
    """Check prerequisites.
2239

2240
    This checks that the instance is in the cluster.
2241

2242
    """
2243
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2244
    assert self.instance is not None, \
2245
      "Cannot retrieve locked instance %s" % self.op.instance_name
2246

    
2247
    # check bridges existance
2248
    _CheckInstanceBridgesExist(instance)
2249

    
2250
  def Exec(self, feedback_fn):
2251
    """Reboot the instance.
2252

2253
    """
2254
    instance = self.instance
2255
    ignore_secondaries = self.op.ignore_secondaries
2256
    reboot_type = self.op.reboot_type
2257
    extra_args = getattr(self.op, "extra_args", "")
2258

    
2259
    node_current = instance.primary_node
2260

    
2261
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2262
                       constants.INSTANCE_REBOOT_HARD]:
2263
      if not rpc.call_instance_reboot(node_current, instance,
2264
                                      reboot_type, extra_args):
2265
        raise errors.OpExecError("Could not reboot instance")
2266
    else:
2267
      if not rpc.call_instance_shutdown(node_current, instance):
2268
        raise errors.OpExecError("could not shutdown instance for full reboot")
2269
      _ShutdownInstanceDisks(instance, self.cfg)
2270
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2271
      if not rpc.call_instance_start(node_current, instance, extra_args):
2272
        _ShutdownInstanceDisks(instance, self.cfg)
2273
        raise errors.OpExecError("Could not start instance for full reboot")
2274

    
2275
    self.cfg.MarkInstanceUp(instance.name)
2276

    
2277

    
2278
class LUShutdownInstance(LogicalUnit):
2279
  """Shutdown an instance.
2280

2281
  """
2282
  HPATH = "instance-stop"
2283
  HTYPE = constants.HTYPE_INSTANCE
2284
  _OP_REQP = ["instance_name"]
2285
  REQ_BGL = False
2286

    
2287
  def ExpandNames(self):
2288
    self._ExpandAndLockInstance()
2289
    self.needed_locks[locking.LEVEL_NODE] = []
2290
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2291

    
2292
  def DeclareLocks(self, level):
2293
    if level == locking.LEVEL_NODE:
2294
      self._LockInstancesNodes()
2295

    
2296
  def BuildHooksEnv(self):
2297
    """Build hooks env.
2298

2299
    This runs on master, primary and secondary nodes of the instance.
2300

2301
    """
2302
    env = _BuildInstanceHookEnvByObject(self.instance)
2303
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2304
          list(self.instance.secondary_nodes))
2305
    return env, nl, nl
2306

    
2307
  def CheckPrereq(self):
2308
    """Check prerequisites.
2309

2310
    This checks that the instance is in the cluster.
2311

2312
    """
2313
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2314
    assert self.instance is not None, \
2315
      "Cannot retrieve locked instance %s" % self.op.instance_name
2316

    
2317
  def Exec(self, feedback_fn):
2318
    """Shutdown the instance.
2319

2320
    """
2321
    instance = self.instance
2322
    node_current = instance.primary_node
2323
    self.cfg.MarkInstanceDown(instance.name)
2324
    if not rpc.call_instance_shutdown(node_current, instance):
2325
      logger.Error("could not shutdown instance")
2326

    
2327
    _ShutdownInstanceDisks(instance, self.cfg)
2328

    
2329

    
2330
class LUReinstallInstance(LogicalUnit):
2331
  """Reinstall an instance.
2332

2333
  """
2334
  HPATH = "instance-reinstall"
2335
  HTYPE = constants.HTYPE_INSTANCE
2336
  _OP_REQP = ["instance_name"]
2337
  REQ_BGL = False
2338

    
2339
  def ExpandNames(self):
2340
    self._ExpandAndLockInstance()
2341
    self.needed_locks[locking.LEVEL_NODE] = []
2342
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2343

    
2344
  def DeclareLocks(self, level):
2345
    if level == locking.LEVEL_NODE:
2346
      self._LockInstancesNodes()
2347

    
2348
  def BuildHooksEnv(self):
2349
    """Build hooks env.
2350

2351
    This runs on master, primary and secondary nodes of the instance.
2352

2353
    """
2354
    env = _BuildInstanceHookEnvByObject(self.instance)
2355
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2356
          list(self.instance.secondary_nodes))
2357
    return env, nl, nl
2358

    
2359
  def CheckPrereq(self):
2360
    """Check prerequisites.
2361

2362
    This checks that the instance is in the cluster and is not running.
2363

2364
    """
2365
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366
    assert instance is not None, \
2367
      "Cannot retrieve locked instance %s" % self.op.instance_name
2368

    
2369
    if instance.disk_template == constants.DT_DISKLESS:
2370
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2371
                                 self.op.instance_name)
2372
    if instance.status != "down":
2373
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2374
                                 self.op.instance_name)
2375
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2376
                                         instance.hypervisor)
2377
    if remote_info:
2378
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2379
                                 (self.op.instance_name,
2380
                                  instance.primary_node))
2381

    
2382
    self.op.os_type = getattr(self.op, "os_type", None)
2383
    if self.op.os_type is not None:
2384
      # OS verification
2385
      pnode = self.cfg.GetNodeInfo(
2386
        self.cfg.ExpandNodeName(instance.primary_node))
2387
      if pnode is None:
2388
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2389
                                   self.op.pnode)
2390
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2391
      if not os_obj:
2392
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2393
                                   " primary node"  % self.op.os_type)
2394

    
2395
    self.instance = instance
2396

    
2397
  def Exec(self, feedback_fn):
2398
    """Reinstall the instance.
2399

2400
    """
2401
    inst = self.instance
2402

    
2403
    if self.op.os_type is not None:
2404
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2405
      inst.os = self.op.os_type
2406
      self.cfg.Update(inst)
2407

    
2408
    _StartInstanceDisks(self.cfg, inst, None)
2409
    try:
2410
      feedback_fn("Running the instance OS create scripts...")
2411
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2412
        raise errors.OpExecError("Could not install OS for instance %s"
2413
                                 " on node %s" %
2414
                                 (inst.name, inst.primary_node))
2415
    finally:
2416
      _ShutdownInstanceDisks(inst, self.cfg)
2417

    
2418

    
2419
class LURenameInstance(LogicalUnit):
2420
  """Rename an instance.
2421

2422
  """
2423
  HPATH = "instance-rename"
2424
  HTYPE = constants.HTYPE_INSTANCE
2425
  _OP_REQP = ["instance_name", "new_name"]
2426

    
2427
  def BuildHooksEnv(self):
2428
    """Build hooks env.
2429

2430
    This runs on master, primary and secondary nodes of the instance.
2431

2432
    """
2433
    env = _BuildInstanceHookEnvByObject(self.instance)
2434
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2435
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2436
          list(self.instance.secondary_nodes))
2437
    return env, nl, nl
2438

    
2439
  def CheckPrereq(self):
2440
    """Check prerequisites.
2441

2442
    This checks that the instance is in the cluster and is not running.
2443

2444
    """
2445
    instance = self.cfg.GetInstanceInfo(
2446
      self.cfg.ExpandInstanceName(self.op.instance_name))
2447
    if instance is None:
2448
      raise errors.OpPrereqError("Instance '%s' not known" %
2449
                                 self.op.instance_name)
2450
    if instance.status != "down":
2451
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2452
                                 self.op.instance_name)
2453
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2454
                                         instance.hypervisor)
2455
    if remote_info:
2456
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2457
                                 (self.op.instance_name,
2458
                                  instance.primary_node))
2459
    self.instance = instance
2460

    
2461
    # new name verification
2462
    name_info = utils.HostInfo(self.op.new_name)
2463

    
2464
    self.op.new_name = new_name = name_info.name
2465
    instance_list = self.cfg.GetInstanceList()
2466
    if new_name in instance_list:
2467
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2468
                                 new_name)
2469

    
2470
    if not getattr(self.op, "ignore_ip", False):
2471
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2472
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2473
                                   (name_info.ip, new_name))
2474

    
2475

    
2476
  def Exec(self, feedback_fn):
2477
    """Reinstall the instance.
2478

2479
    """
2480
    inst = self.instance
2481
    old_name = inst.name
2482

    
2483
    if inst.disk_template == constants.DT_FILE:
2484
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2485

    
2486
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2487
    # Change the instance lock. This is definitely safe while we hold the BGL
2488
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2489
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2490

    
2491
    # re-read the instance from the configuration after rename
2492
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2493

    
2494
    if inst.disk_template == constants.DT_FILE:
2495
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2496
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2497
                                                old_file_storage_dir,
2498
                                                new_file_storage_dir)
2499

    
2500
      if not result:
2501
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2502
                                 " directory '%s' to '%s' (but the instance"
2503
                                 " has been renamed in Ganeti)" % (
2504
                                 inst.primary_node, old_file_storage_dir,
2505
                                 new_file_storage_dir))
2506

    
2507
      if not result[0]:
2508
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2509
                                 " (but the instance has been renamed in"
2510
                                 " Ganeti)" % (old_file_storage_dir,
2511
                                               new_file_storage_dir))
2512

    
2513
    _StartInstanceDisks(self.cfg, inst, None)
2514
    try:
2515
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2516
                                          "sda", "sdb"):
2517
        msg = ("Could not run OS rename script for instance %s on node %s"
2518
               " (but the instance has been renamed in Ganeti)" %
2519
               (inst.name, inst.primary_node))
2520
        logger.Error(msg)
2521
    finally:
2522
      _ShutdownInstanceDisks(inst, self.cfg)
2523

    
2524

    
2525
class LURemoveInstance(LogicalUnit):
2526
  """Remove an instance.
2527

2528
  """
2529
  HPATH = "instance-remove"
2530
  HTYPE = constants.HTYPE_INSTANCE
2531
  _OP_REQP = ["instance_name", "ignore_failures"]
2532
  REQ_BGL = False
2533

    
2534
  def ExpandNames(self):
2535
    self._ExpandAndLockInstance()
2536
    self.needed_locks[locking.LEVEL_NODE] = []
2537
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2538

    
2539
  def DeclareLocks(self, level):
2540
    if level == locking.LEVEL_NODE:
2541
      self._LockInstancesNodes()
2542

    
2543
  def BuildHooksEnv(self):
2544
    """Build hooks env.
2545

2546
    This runs on master, primary and secondary nodes of the instance.
2547

2548
    """
2549
    env = _BuildInstanceHookEnvByObject(self.instance)
2550
    nl = [self.cfg.GetMasterNode()]
2551
    return env, nl, nl
2552

    
2553
  def CheckPrereq(self):
2554
    """Check prerequisites.
2555

2556
    This checks that the instance is in the cluster.
2557

2558
    """
2559
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2560
    assert self.instance is not None, \
2561
      "Cannot retrieve locked instance %s" % self.op.instance_name
2562

    
2563
  def Exec(self, feedback_fn):
2564
    """Remove the instance.
2565

2566
    """
2567
    instance = self.instance
2568
    logger.Info("shutting down instance %s on node %s" %
2569
                (instance.name, instance.primary_node))
2570

    
2571
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2572
      if self.op.ignore_failures:
2573
        feedback_fn("Warning: can't shutdown instance")
2574
      else:
2575
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2576
                                 (instance.name, instance.primary_node))
2577

    
2578
    logger.Info("removing block devices for instance %s" % instance.name)
2579

    
2580
    if not _RemoveDisks(instance, self.cfg):
2581
      if self.op.ignore_failures:
2582
        feedback_fn("Warning: can't remove instance's disks")
2583
      else:
2584
        raise errors.OpExecError("Can't remove instance's disks")
2585

    
2586
    logger.Info("removing instance %s out of cluster config" % instance.name)
2587

    
2588
    self.cfg.RemoveInstance(instance.name)
2589
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2590

    
2591

    
2592
class LUQueryInstances(NoHooksLU):
2593
  """Logical unit for querying instances.
2594

2595
  """
2596
  _OP_REQP = ["output_fields", "names"]
2597
  REQ_BGL = False
2598

    
2599
  def ExpandNames(self):
2600
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2601
    self.static_fields = frozenset([
2602
      "name", "os", "pnode", "snodes",
2603
      "admin_state", "admin_ram",
2604
      "disk_template", "ip", "mac", "bridge",
2605
      "sda_size", "sdb_size", "vcpus", "tags",
2606
      "network_port", "kernel_path", "initrd_path",
2607
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2608
      "hvm_cdrom_image_path", "hvm_nic_type",
2609
      "hvm_disk_type", "vnc_bind_address",
2610
      "serial_no", "hypervisor",
2611
      ])
2612
    _CheckOutputFields(static=self.static_fields,
2613
                       dynamic=self.dynamic_fields,
2614
                       selected=self.op.output_fields)
2615

    
2616
    self.needed_locks = {}
2617
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2618
    self.share_locks[locking.LEVEL_NODE] = 1
2619

    
2620
    if self.op.names:
2621
      self.wanted = _GetWantedInstances(self, self.op.names)
2622
    else:
2623
      self.wanted = locking.ALL_SET
2624

    
2625
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2626
    if self.do_locking:
2627
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2628
      self.needed_locks[locking.LEVEL_NODE] = []
2629
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2630

    
2631
  def DeclareLocks(self, level):
2632
    if level == locking.LEVEL_NODE and self.do_locking:
2633
      self._LockInstancesNodes()
2634

    
2635
  def CheckPrereq(self):
2636
    """Check prerequisites.
2637

2638
    """
2639
    pass
2640

    
2641
  def Exec(self, feedback_fn):
2642
    """Computes the list of nodes and their attributes.
2643

2644
    """
2645
    all_info = self.cfg.GetAllInstancesInfo()
2646
    if self.do_locking:
2647
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2648
    elif self.wanted != locking.ALL_SET:
2649
      instance_names = self.wanted
2650
      missing = set(instance_names).difference(all_info.keys())
2651
      if missing:
2652
        raise self.OpExecError(
2653
          "Some instances were removed before retrieving their data: %s"
2654
          % missing)
2655
    else:
2656
      instance_names = all_info.keys()
2657
    instance_list = [all_info[iname] for iname in instance_names]
2658

    
2659
    # begin data gathering
2660

    
2661
    nodes = frozenset([inst.primary_node for inst in instance_list])
2662
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2663

    
2664
    bad_nodes = []
2665
    if self.dynamic_fields.intersection(self.op.output_fields):
2666
      live_data = {}
2667
      node_data = rpc.call_all_instances_info(nodes, hv_list)
2668
      for name in nodes:
2669
        result = node_data[name]
2670
        if result:
2671
          live_data.update(result)
2672
        elif result == False:
2673
          bad_nodes.append(name)
2674
        # else no instance is alive
2675
    else:
2676
      live_data = dict([(name, {}) for name in instance_names])
2677

    
2678
    # end data gathering
2679

    
2680
    output = []
2681
    for instance in instance_list:
2682
      iout = []
2683
      for field in self.op.output_fields:
2684
        if field == "name":
2685
          val = instance.name
2686
        elif field == "os":
2687
          val = instance.os
2688
        elif field == "pnode":
2689
          val = instance.primary_node
2690
        elif field == "snodes":
2691
          val = list(instance.secondary_nodes)
2692
        elif field == "admin_state":
2693
          val = (instance.status != "down")
2694
        elif field == "oper_state":
2695
          if instance.primary_node in bad_nodes:
2696
            val = None
2697
          else:
2698
            val = bool(live_data.get(instance.name))
2699
        elif field == "status":
2700
          if instance.primary_node in bad_nodes:
2701
            val = "ERROR_nodedown"
2702
          else:
2703
            running = bool(live_data.get(instance.name))
2704
            if running:
2705
              if instance.status != "down":
2706
                val = "running"
2707
              else:
2708
                val = "ERROR_up"
2709
            else:
2710
              if instance.status != "down":
2711
                val = "ERROR_down"
2712
              else:
2713
                val = "ADMIN_down"
2714
        elif field == "admin_ram":
2715
          val = instance.memory
2716
        elif field == "oper_ram":
2717
          if instance.primary_node in bad_nodes:
2718
            val = None
2719
          elif instance.name in live_data:
2720
            val = live_data[instance.name].get("memory", "?")
2721
          else:
2722
            val = "-"
2723
        elif field == "disk_template":
2724
          val = instance.disk_template
2725
        elif field == "ip":
2726
          val = instance.nics[0].ip
2727
        elif field == "bridge":
2728
          val = instance.nics[0].bridge
2729
        elif field == "mac":
2730
          val = instance.nics[0].mac
2731
        elif field == "sda_size" or field == "sdb_size":
2732
          disk = instance.FindDisk(field[:3])
2733
          if disk is None:
2734
            val = None
2735
          else:
2736
            val = disk.size
2737
        elif field == "vcpus":
2738
          val = instance.vcpus
2739
        elif field == "tags":
2740
          val = list(instance.GetTags())
2741
        elif field == "serial_no":
2742
          val = instance.serial_no
2743
        elif field in ("network_port", "kernel_path", "initrd_path",
2744
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2745
                       "hvm_cdrom_image_path", "hvm_nic_type",
2746
                       "hvm_disk_type", "vnc_bind_address"):
2747
          val = getattr(instance, field, None)
2748
          if val is not None:
2749
            pass
2750
          elif field in ("hvm_nic_type", "hvm_disk_type",
2751
                         "kernel_path", "initrd_path"):
2752
            val = "default"
2753
          else:
2754
            val = "-"
2755
        elif field == "hypervisor":
2756
          val = instance.hypervisor
2757
        else:
2758
          raise errors.ParameterError(field)
2759
        iout.append(val)
2760
      output.append(iout)
2761

    
2762
    return output
2763

    
2764

    
2765
class LUFailoverInstance(LogicalUnit):
2766
  """Failover an instance.
2767

2768
  """
2769
  HPATH = "instance-failover"
2770
  HTYPE = constants.HTYPE_INSTANCE
2771
  _OP_REQP = ["instance_name", "ignore_consistency"]
2772
  REQ_BGL = False
2773

    
2774
  def ExpandNames(self):
2775
    self._ExpandAndLockInstance()
2776
    self.needed_locks[locking.LEVEL_NODE] = []
2777
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2778

    
2779
  def DeclareLocks(self, level):
2780
    if level == locking.LEVEL_NODE:
2781
      self._LockInstancesNodes()
2782

    
2783
  def BuildHooksEnv(self):
2784
    """Build hooks env.
2785

2786
    This runs on master, primary and secondary nodes of the instance.
2787

2788
    """
2789
    env = {
2790
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2791
      }
2792
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2793
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2794
    return env, nl, nl
2795

    
2796
  def CheckPrereq(self):
2797
    """Check prerequisites.
2798

2799
    This checks that the instance is in the cluster.
2800

2801
    """
2802
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2803
    assert self.instance is not None, \
2804
      "Cannot retrieve locked instance %s" % self.op.instance_name
2805

    
2806
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2807
      raise errors.OpPrereqError("Instance's disk layout is not"
2808
                                 " network mirrored, cannot failover.")
2809

    
2810
    secondary_nodes = instance.secondary_nodes
2811
    if not secondary_nodes:
2812
      raise errors.ProgrammerError("no secondary node but using "
2813
                                   "a mirrored disk template")
2814

    
2815
    target_node = secondary_nodes[0]
2816
    # check memory requirements on the secondary node
2817
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2818
                         instance.name, instance.memory,
2819
                         instance.hypervisor)
2820

    
2821
    # check bridge existance
2822
    brlist = [nic.bridge for nic in instance.nics]
2823
    if not rpc.call_bridges_exist(target_node, brlist):
2824
      raise errors.OpPrereqError("One or more target bridges %s does not"
2825
                                 " exist on destination node '%s'" %
2826
                                 (brlist, target_node))
2827

    
2828
  def Exec(self, feedback_fn):
2829
    """Failover an instance.
2830

2831
    The failover is done by shutting it down on its present node and
2832
    starting it on the secondary.
2833

2834
    """
2835
    instance = self.instance
2836

    
2837
    source_node = instance.primary_node
2838
    target_node = instance.secondary_nodes[0]
2839

    
2840
    feedback_fn("* checking disk consistency between source and target")
2841
    for dev in instance.disks:
2842
      # for drbd, these are drbd over lvm
2843
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2844
        if instance.status == "up" and not self.op.ignore_consistency:
2845
          raise errors.OpExecError("Disk %s is degraded on target node,"
2846
                                   " aborting failover." % dev.iv_name)
2847

    
2848
    feedback_fn("* shutting down instance on source node")
2849
    logger.Info("Shutting down instance %s on node %s" %
2850
                (instance.name, source_node))
2851

    
2852
    if not rpc.call_instance_shutdown(source_node, instance):
2853
      if self.op.ignore_consistency:
2854
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2855
                     " anyway. Please make sure node %s is down"  %
2856
                     (instance.name, source_node, source_node))
2857
      else:
2858
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2859
                                 (instance.name, source_node))
2860

    
2861
    feedback_fn("* deactivating the instance's disks on source node")
2862
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2863
      raise errors.OpExecError("Can't shut down the instance's disks.")
2864

    
2865
    instance.primary_node = target_node
2866
    # distribute new instance config to the other nodes
2867
    self.cfg.Update(instance)
2868

    
2869
    # Only start the instance if it's marked as up
2870
    if instance.status == "up":
2871
      feedback_fn("* activating the instance's disks on target node")
2872
      logger.Info("Starting instance %s on node %s" %
2873
                  (instance.name, target_node))
2874

    
2875
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2876
                                               ignore_secondaries=True)
2877
      if not disks_ok:
2878
        _ShutdownInstanceDisks(instance, self.cfg)
2879
        raise errors.OpExecError("Can't activate the instance's disks")
2880

    
2881
      feedback_fn("* starting the instance on the target node")
2882
      if not rpc.call_instance_start(target_node, instance, None):
2883
        _ShutdownInstanceDisks(instance, self.cfg)
2884
        raise errors.OpExecError("Could not start instance %s on node %s." %
2885
                                 (instance.name, target_node))
2886

    
2887

    
2888
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2889
  """Create a tree of block devices on the primary node.
2890

2891
  This always creates all devices.
2892

2893
  """
2894
  if device.children:
2895
    for child in device.children:
2896
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2897
        return False
2898

    
2899
  cfg.SetDiskID(device, node)
2900
  new_id = rpc.call_blockdev_create(node, device, device.size,
2901
                                    instance.name, True, info)
2902
  if not new_id:
2903
    return False
2904
  if device.physical_id is None:
2905
    device.physical_id = new_id
2906
  return True
2907

    
2908

    
2909
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2910
  """Create a tree of block devices on a secondary node.
2911

2912
  If this device type has to be created on secondaries, create it and
2913
  all its children.
2914

2915
  If not, just recurse to children keeping the same 'force' value.
2916

2917
  """
2918
  if device.CreateOnSecondary():
2919
    force = True
2920
  if device.children:
2921
    for child in device.children:
2922
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2923
                                        child, force, info):
2924
        return False
2925

    
2926
  if not force:
2927
    return True
2928
  cfg.SetDiskID(device, node)
2929
  new_id = rpc.call_blockdev_create(node, device, device.size,
2930
                                    instance.name, False, info)
2931
  if not new_id:
2932
    return False
2933
  if device.physical_id is None:
2934
    device.physical_id = new_id
2935
  return True
2936

    
2937

    
2938
def _GenerateUniqueNames(cfg, exts):
2939
  """Generate a suitable LV name.
2940

2941
  This will generate a logical volume name for the given instance.
2942

2943
  """
2944
  results = []
2945
  for val in exts:
2946
    new_id = cfg.GenerateUniqueID()
2947
    results.append("%s%s" % (new_id, val))
2948
  return results
2949

    
2950

    
2951
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2952
                         p_minor, s_minor):
2953
  """Generate a drbd8 device complete with its children.
2954

2955
  """
2956
  port = cfg.AllocatePort()
2957
  vgname = cfg.GetVGName()
2958
  shared_secret = cfg.GenerateDRBDSecret()
2959
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2960
                          logical_id=(vgname, names[0]))
2961
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2962
                          logical_id=(vgname, names[1]))
2963
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2964
                          logical_id=(primary, secondary, port,
2965
                                      p_minor, s_minor,
2966
                                      shared_secret),
2967
                          children=[dev_data, dev_meta],
2968
                          iv_name=iv_name)
2969
  return drbd_dev
2970

    
2971

    
2972
def _GenerateDiskTemplate(cfg, template_name,
2973
                          instance_name, primary_node,
2974
                          secondary_nodes, disk_sz, swap_sz,
2975
                          file_storage_dir, file_driver):
2976
  """Generate the entire disk layout for a given template type.
2977

2978
  """
2979
  #TODO: compute space requirements
2980

    
2981
  vgname = cfg.GetVGName()
2982
  if template_name == constants.DT_DISKLESS:
2983
    disks = []
2984
  elif template_name == constants.DT_PLAIN:
2985
    if len(secondary_nodes) != 0:
2986
      raise errors.ProgrammerError("Wrong template configuration")
2987

    
2988
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2989
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2990
                           logical_id=(vgname, names[0]),
2991
                           iv_name = "sda")
2992
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2993
                           logical_id=(vgname, names[1]),
2994
                           iv_name = "sdb")
2995
    disks = [sda_dev, sdb_dev]
2996
  elif template_name == constants.DT_DRBD8:
2997
    if len(secondary_nodes) != 1:
2998
      raise errors.ProgrammerError("Wrong template configuration")
2999
    remote_node = secondary_nodes[0]
3000
    (minor_pa, minor_pb,
3001
     minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
3002
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3003

    
3004
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
3005
                                       ".sdb_data", ".sdb_meta"])
3006
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3007
                                        disk_sz, names[0:2], "sda",
3008
                                        minor_pa, minor_sa)
3009
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
3010
                                        swap_sz, names[2:4], "sdb",
3011
                                        minor_pb, minor_sb)
3012
    disks = [drbd_sda_dev, drbd_sdb_dev]
3013
  elif template_name == constants.DT_FILE:
3014
    if len(secondary_nodes) != 0:
3015
      raise errors.ProgrammerError("Wrong template configuration")
3016

    
3017
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3018
                                iv_name="sda", logical_id=(file_driver,
3019
                                "%s/sda" % file_storage_dir))
3020
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3021
                                iv_name="sdb", logical_id=(file_driver,
3022
                                "%s/sdb" % file_storage_dir))
3023
    disks = [file_sda_dev, file_sdb_dev]
3024
  else:
3025
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3026
  return disks
3027

    
3028

    
3029
def _GetInstanceInfoText(instance):
3030
  """Compute that text that should be added to the disk's metadata.
3031

3032
  """
3033
  return "originstname+%s" % instance.name
3034

    
3035

    
3036
def _CreateDisks(cfg, instance):
3037
  """Create all disks for an instance.
3038

3039
  This abstracts away some work from AddInstance.
3040

3041
  Args:
3042
    instance: the instance object
3043

3044
  Returns:
3045
    True or False showing the success of the creation process
3046

3047
  """
3048
  info = _GetInstanceInfoText(instance)
3049

    
3050
  if instance.disk_template == constants.DT_FILE:
3051
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3052
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3053
                                              file_storage_dir)
3054

    
3055
    if not result:
3056
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3057
      return False
3058

    
3059
    if not result[0]:
3060
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3061
      return False
3062

    
3063
  for device in instance.disks:
3064
    logger.Info("creating volume %s for instance %s" %
3065
                (device.iv_name, instance.name))
3066
    #HARDCODE
3067
    for secondary_node in instance.secondary_nodes:
3068
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3069
                                        device, False, info):
3070
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3071
                     (device.iv_name, device, secondary_node))
3072
        return False
3073
    #HARDCODE
3074
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3075
                                    instance, device, info):
3076
      logger.Error("failed to create volume %s on primary!" %
3077
                   device.iv_name)
3078
      return False
3079

    
3080
  return True
3081

    
3082

    
3083
def _RemoveDisks(instance, cfg):
3084
  """Remove all disks for an instance.
3085

3086
  This abstracts away some work from `AddInstance()` and
3087
  `RemoveInstance()`. Note that in case some of the devices couldn't
3088
  be removed, the removal will continue with the other ones (compare
3089
  with `_CreateDisks()`).
3090

3091
  Args:
3092
    instance: the instance object
3093

3094
  Returns:
3095
    True or False showing the success of the removal proces
3096

3097
  """
3098
  logger.Info("removing block devices for instance %s" % instance.name)
3099

    
3100
  result = True
3101
  for device in instance.disks:
3102
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3103
      cfg.SetDiskID(disk, node)
3104
      if not rpc.call_blockdev_remove(node, disk):
3105
        logger.Error("could not remove block device %s on node %s,"
3106
                     " continuing anyway" %
3107
                     (device.iv_name, node))
3108
        result = False
3109

    
3110
  if instance.disk_template == constants.DT_FILE:
3111
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3112
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3113
                                            file_storage_dir):
3114
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3115
      result = False
3116

    
3117
  return result
3118

    
3119

    
3120
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3121
  """Compute disk size requirements in the volume group
3122

3123
  This is currently hard-coded for the two-drive layout.
3124

3125
  """
3126
  # Required free disk space as a function of disk and swap space
3127
  req_size_dict = {
3128
    constants.DT_DISKLESS: None,
3129
    constants.DT_PLAIN: disk_size + swap_size,
3130
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3131
    constants.DT_DRBD8: disk_size + swap_size + 256,
3132
    constants.DT_FILE: None,
3133
  }
3134

    
3135
  if disk_template not in req_size_dict:
3136
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3137
                                 " is unknown" %  disk_template)
3138

    
3139
  return req_size_dict[disk_template]
3140

    
3141

    
3142
class LUCreateInstance(LogicalUnit):
3143
  """Create an instance.
3144

3145
  """
3146
  HPATH = "instance-add"
3147
  HTYPE = constants.HTYPE_INSTANCE
3148
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3149
              "disk_template", "swap_size", "mode", "start", "vcpus",
3150
              "wait_for_sync", "ip_check", "mac"]
3151
  REQ_BGL = False
3152

    
3153
  def _ExpandNode(self, node):
3154
    """Expands and checks one node name.
3155

3156
    """
3157
    node_full = self.cfg.ExpandNodeName(node)
3158
    if node_full is None:
3159
      raise errors.OpPrereqError("Unknown node %s" % node)
3160
    return node_full
3161

    
3162
  def ExpandNames(self):
3163
    """ExpandNames for CreateInstance.
3164

3165
    Figure out the right locks for instance creation.
3166

3167
    """
3168
    self.needed_locks = {}
3169

    
3170
    # set optional parameters to none if they don't exist
3171
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3172
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3173
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3174
                 "vnc_bind_address", "hypervisor"]:
3175
      if not hasattr(self.op, attr):
3176
        setattr(self.op, attr, None)
3177

    
3178
    # verify creation mode
3179
    if self.op.mode not in (constants.INSTANCE_CREATE,
3180
                            constants.INSTANCE_IMPORT):
3181
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3182
                                 self.op.mode)
3183
    # disk template and mirror node verification
3184
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3185
      raise errors.OpPrereqError("Invalid disk template name")
3186

    
3187
    #### instance parameters check
3188

    
3189
    # instance name verification
3190
    hostname1 = utils.HostInfo(self.op.instance_name)
3191
    self.op.instance_name = instance_name = hostname1.name
3192

    
3193
    # this is just a preventive check, but someone might still add this
3194
    # instance in the meantime, and creation will fail at lock-add time
3195
    if instance_name in self.cfg.GetInstanceList():
3196
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3197
                                 instance_name)
3198

    
3199
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3200

    
3201
    # ip validity checks
3202
    ip = getattr(self.op, "ip", None)
3203
    if ip is None or ip.lower() == "none":
3204
      inst_ip = None
3205
    elif ip.lower() == "auto":
3206
      inst_ip = hostname1.ip
3207
    else:
3208
      if not utils.IsValidIP(ip):
3209
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3210
                                   " like a valid IP" % ip)
3211
      inst_ip = ip
3212
    self.inst_ip = self.op.ip = inst_ip
3213
    # used in CheckPrereq for ip ping check
3214
    self.check_ip = hostname1.ip
3215

    
3216
    # MAC address verification
3217
    if self.op.mac != "auto":
3218
      if not utils.IsValidMac(self.op.mac.lower()):
3219
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3220
                                   self.op.mac)
3221

    
3222
    # boot order verification
3223
    if self.op.hvm_boot_order is not None:
3224
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3225
        raise errors.OpPrereqError("invalid boot order specified,"
3226
                                   " must be one or more of [acdn]")
3227
    # file storage checks
3228
    if (self.op.file_driver and
3229
        not self.op.file_driver in constants.FILE_DRIVER):
3230
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3231
                                 self.op.file_driver)
3232

    
3233
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3234
      raise errors.OpPrereqError("File storage directory path not absolute")
3235

    
3236
    ### Node/iallocator related checks
3237
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3238
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3239
                                 " node must be given")
3240

    
3241
    if self.op.iallocator:
3242
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3243
    else:
3244
      self.op.pnode = self._ExpandNode(self.op.pnode)
3245
      nodelist = [self.op.pnode]
3246
      if self.op.snode is not None:
3247
        self.op.snode = self._ExpandNode(self.op.snode)
3248
        nodelist.append(self.op.snode)
3249
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3250

    
3251
    # in case of import lock the source node too
3252
    if self.op.mode == constants.INSTANCE_IMPORT:
3253
      src_node = getattr(self.op, "src_node", None)
3254
      src_path = getattr(self.op, "src_path", None)
3255

    
3256
      if src_node is None or src_path is None:
3257
        raise errors.OpPrereqError("Importing an instance requires source"
3258
                                   " node and path options")
3259

    
3260
      if not os.path.isabs(src_path):
3261
        raise errors.OpPrereqError("The source path must be absolute")
3262

    
3263
      self.op.src_node = src_node = self._ExpandNode(src_node)
3264
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3265
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3266

    
3267
    else: # INSTANCE_CREATE
3268
      if getattr(self.op, "os_type", None) is None:
3269
        raise errors.OpPrereqError("No guest OS specified")
3270

    
3271
  def _RunAllocator(self):
3272
    """Run the allocator based on input opcode.
3273

3274
    """
3275
    disks = [{"size": self.op.disk_size, "mode": "w"},
3276
             {"size": self.op.swap_size, "mode": "w"}]
3277
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3278
             "bridge": self.op.bridge}]
3279
    ial = IAllocator(self.cfg,
3280
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3281
                     name=self.op.instance_name,
3282
                     disk_template=self.op.disk_template,
3283
                     tags=[],
3284
                     os=self.op.os_type,
3285
                     vcpus=self.op.vcpus,
3286
                     mem_size=self.op.mem_size,
3287
                     disks=disks,
3288
                     nics=nics,
3289
                     )
3290

    
3291
    ial.Run(self.op.iallocator)
3292

    
3293
    if not ial.success:
3294
      raise errors.OpPrereqError("Can't compute nodes using"
3295
                                 " iallocator '%s': %s" % (self.op.iallocator,
3296
                                                           ial.info))
3297
    if len(ial.nodes) != ial.required_nodes:
3298
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3299
                                 " of nodes (%s), required %s" %
3300
                                 (self.op.iallocator, len(ial.nodes),
3301
                                  ial.required_nodes))
3302
    self.op.pnode = ial.nodes[0]
3303
    logger.ToStdout("Selected nodes for the instance: %s" %
3304
                    (", ".join(ial.nodes),))
3305
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3306
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3307
    if ial.required_nodes == 2:
3308
      self.op.snode = ial.nodes[1]
3309

    
3310
  def BuildHooksEnv(self):
3311
    """Build hooks env.
3312

3313
    This runs on master, primary and secondary nodes of the instance.
3314

3315
    """
3316
    env = {
3317
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3318
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3319
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3320
      "INSTANCE_ADD_MODE": self.op.mode,
3321
      }
3322
    if self.op.mode == constants.INSTANCE_IMPORT:
3323
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3324
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3325
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3326

    
3327
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3328
      primary_node=self.op.pnode,
3329
      secondary_nodes=self.secondaries,
3330
      status=self.instance_status,
3331
      os_type=self.op.os_type,
3332
      memory=self.op.mem_size,
3333
      vcpus=self.op.vcpus,
3334
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3335
    ))
3336

    
3337
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3338
          self.secondaries)
3339
    return env, nl, nl
3340

    
3341

    
3342
  def CheckPrereq(self):
3343
    """Check prerequisites.
3344

3345
    """
3346
    if (not self.cfg.GetVGName() and
3347
        self.op.disk_template not in constants.DTS_NOT_LVM):
3348
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3349
                                 " instances")
3350

    
3351
    # cheap checks (from the config only)
3352

    
3353
    if self.op.hypervisor is None:
3354
      self.op.hypervisor = self.cfg.GetHypervisorType()
3355

    
3356
    enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3357
    if self.op.hypervisor not in enabled_hvs:
3358
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3359
                                 " cluster (%s)" % (self.op.hypervisor,
3360
                                  ",".join(enabled_hvs)))
3361

    
3362
    # costly checks (from nodes)
3363

    
3364
    if self.op.mode == constants.INSTANCE_IMPORT:
3365
      src_node = self.op.src_node
3366
      src_path = self.op.src_path
3367

    
3368
      export_info = rpc.call_export_info(src_node, src_path)
3369

    
3370
      if not export_info:
3371
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3372

    
3373
      if not export_info.has_section(constants.INISECT_EXP):
3374
        raise errors.ProgrammerError("Corrupted export config")
3375

    
3376
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3377
      if (int(ei_version) != constants.EXPORT_VERSION):
3378
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3379
                                   (ei_version, constants.EXPORT_VERSION))
3380

    
3381
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3382
        raise errors.OpPrereqError("Can't import instance with more than"
3383
                                   " one data disk")
3384

    
3385
      # FIXME: are the old os-es, disk sizes, etc. useful?
3386
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3387
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3388
                                                         'disk0_dump'))
3389
      self.src_image = diskimage
3390

    
3391
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3392

    
3393
    if self.op.start and not self.op.ip_check:
3394
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3395
                                 " adding an instance in start mode")
3396

    
3397
    if self.op.ip_check:
3398
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3399
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3400
                                   (self.check_ip, instance_name))
3401

    
3402
    # bridge verification
3403
    bridge = getattr(self.op, "bridge", None)
3404
    if bridge is None:
3405
      self.op.bridge = self.cfg.GetDefBridge()
3406
    else:
3407
      self.op.bridge = bridge
3408

    
3409
    #### allocator run
3410

    
3411
    if self.op.iallocator is not None:
3412
      self._RunAllocator()
3413

    
3414
    #### node related checks
3415

    
3416
    # check primary node
3417
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3418
    assert self.pnode is not None, \
3419
      "Cannot retrieve locked node %s" % self.op.pnode
3420
    self.secondaries = []
3421

    
3422
    # mirror node verification
3423
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3424
      if self.op.snode is None:
3425
        raise errors.OpPrereqError("The networked disk templates need"
3426
                                   " a mirror node")
3427
      if self.op.snode == pnode.name:
3428
        raise errors.OpPrereqError("The secondary node cannot be"
3429
                                   " the primary node.")
3430
      self.secondaries.append(self.op.snode)
3431

    
3432
    req_size = _ComputeDiskSize(self.op.disk_template,
3433
                                self.op.disk_size, self.op.swap_size)
3434

    
3435
    # Check lv size requirements
3436
    if req_size is not None:
3437
      nodenames = [pnode.name] + self.secondaries
3438
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3439
                                    self.op.hypervisor)
3440
      for node in nodenames:
3441
        info = nodeinfo.get(node, None)
3442
        if not info:
3443
          raise errors.OpPrereqError("Cannot get current information"
3444
                                     " from node '%s'" % node)
3445
        vg_free = info.get('vg_free', None)
3446
        if not isinstance(vg_free, int):
3447
          raise errors.OpPrereqError("Can't compute free disk space on"
3448
                                     " node %s" % node)
3449
        if req_size > info['vg_free']:
3450
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3451
                                     " %d MB available, %d MB required" %
3452
                                     (node, info['vg_free'], req_size))
3453

    
3454
    # os verification
3455
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3456
    if not os_obj:
3457
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3458
                                 " primary node"  % self.op.os_type)
3459

    
3460
    if self.op.kernel_path == constants.VALUE_NONE:
3461
      raise errors.OpPrereqError("Can't set instance kernel to none")
3462

    
3463
    # bridge check on primary node
3464
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3465
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3466
                                 " destination node '%s'" %
3467
                                 (self.op.bridge, pnode.name))
3468

    
3469
    # memory check on primary node
3470
    if self.op.start:
3471
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3472
                           "creating instance %s" % self.op.instance_name,
3473
                           self.op.mem_size, self.op.hypervisor)
3474

    
3475
    # hvm_cdrom_image_path verification
3476
    if self.op.hvm_cdrom_image_path is not None:
3477
      # FIXME (als): shouldn't these checks happen on the destination node?
3478
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3479
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3480
                                   " be an absolute path or None, not %s" %
3481
                                   self.op.hvm_cdrom_image_path)
3482
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3483
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3484
                                   " regular file or a symlink pointing to"
3485
                                   " an existing regular file, not %s" %
3486
                                   self.op.hvm_cdrom_image_path)
3487

    
3488
    # vnc_bind_address verification
3489
    if self.op.vnc_bind_address is not None:
3490
      if not utils.IsValidIP(self.op.vnc_bind_address):
3491
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3492
                                   " like a valid IP address" %
3493
                                   self.op.vnc_bind_address)
3494

    
3495
    # Xen HVM device type checks
3496
    if self.op.hypervisor == constants.HT_XEN_HVM31:
3497
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3498
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3499
                                   " hypervisor" % self.op.hvm_nic_type)
3500
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3501
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3502
                                   " hypervisor" % self.op.hvm_disk_type)
3503

    
3504
    if self.op.start:
3505
      self.instance_status = 'up'
3506
    else:
3507
      self.instance_status = 'down'
3508

    
3509
  def Exec(self, feedback_fn):
3510
    """Create and add the instance to the cluster.
3511

3512
    """
3513
    instance = self.op.instance_name
3514
    pnode_name = self.pnode.name
3515

    
3516
    if self.op.mac == "auto":
3517
      mac_address = self.cfg.GenerateMAC()
3518
    else:
3519
      mac_address = self.op.mac
3520

    
3521
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3522
    if self.inst_ip is not None:
3523
      nic.ip = self.inst_ip
3524

    
3525
    ht_kind = self.op.hypervisor
3526
    if ht_kind in constants.HTS_REQ_PORT:
3527
      network_port = self.cfg.AllocatePort()
3528
    else:
3529
      network_port = None
3530

    
3531
    if self.op.vnc_bind_address is None:
3532
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3533

    
3534
    # this is needed because os.path.join does not accept None arguments
3535
    if self.op.file_storage_dir is None:
3536
      string_file_storage_dir = ""
3537
    else:
3538
      string_file_storage_dir = self.op.file_storage_dir
3539

    
3540
    # build the full file storage dir path
3541
    file_storage_dir = os.path.normpath(os.path.join(
3542
                                        self.cfg.GetFileStorageDir(),
3543
                                        string_file_storage_dir, instance))
3544

    
3545

    
3546
    disks = _GenerateDiskTemplate(self.cfg,
3547
                                  self.op.disk_template,
3548
                                  instance, pnode_name,
3549
                                  self.secondaries, self.op.disk_size,
3550
                                  self.op.swap_size,
3551
                                  file_storage_dir,
3552
                                  self.op.file_driver)
3553

    
3554
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3555
                            primary_node=pnode_name,
3556
                            memory=self.op.mem_size,
3557
                            vcpus=self.op.vcpus,
3558
                            nics=[nic], disks=disks,
3559
                            disk_template=self.op.disk_template,
3560
                            status=self.instance_status,
3561
                            network_port=network_port,
3562
                            kernel_path=self.op.kernel_path,
3563
                            initrd_path=self.op.initrd_path,
3564
                            hvm_boot_order=self.op.hvm_boot_order,
3565
                            hvm_acpi=self.op.hvm_acpi,
3566
                            hvm_pae=self.op.hvm_pae,
3567
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3568
                            vnc_bind_address=self.op.vnc_bind_address,
3569
                            hvm_nic_type=self.op.hvm_nic_type,
3570
                            hvm_disk_type=self.op.hvm_disk_type,
3571
                            hypervisor=self.op.hypervisor,
3572
                            )
3573

    
3574
    feedback_fn("* creating instance disks...")
3575
    if not _CreateDisks(self.cfg, iobj):
3576
      _RemoveDisks(iobj, self.cfg)
3577
      self.cfg.ReleaseDRBDMinors(instance)
3578
      raise errors.OpExecError("Device creation failed, reverting...")
3579

    
3580
    feedback_fn("adding instance %s to cluster config" % instance)
3581

    
3582
    self.cfg.AddInstance(iobj)
3583
    # Declare that we don't want to remove the instance lock anymore, as we've
3584
    # added the instance to the config
3585
    del self.remove_locks[locking.LEVEL_INSTANCE]
3586
    # Remove the temp. assignements for the instance's drbds
3587
    self.cfg.ReleaseDRBDMinors(instance)
3588

    
3589
    if self.op.wait_for_sync:
3590
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3591
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3592
      # make sure the disks are not degraded (still sync-ing is ok)
3593
      time.sleep(15)
3594
      feedback_fn("* checking mirrors status")
3595
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3596
    else:
3597
      disk_abort = False
3598

    
3599
    if disk_abort:
3600
      _RemoveDisks(iobj, self.cfg)
3601
      self.cfg.RemoveInstance(iobj.name)
3602
      # Make sure the instance lock gets removed
3603
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3604
      raise errors.OpExecError("There are some degraded disks for"
3605
                               " this instance")
3606

    
3607
    feedback_fn("creating os for instance %s on node %s" %
3608
                (instance, pnode_name))
3609

    
3610
    if iobj.disk_template != constants.DT_DISKLESS:
3611
      if self.op.mode == constants.INSTANCE_CREATE:
3612
        feedback_fn("* running the instance OS create scripts...")
3613
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3614
          raise errors.OpExecError("could not add os for instance %s"
3615
                                   " on node %s" %
3616
                                   (instance, pnode_name))
3617

    
3618
      elif self.op.mode == constants.INSTANCE_IMPORT:
3619
        feedback_fn("* running the instance OS import scripts...")
3620
        src_node = self.op.src_node
3621
        src_image = self.src_image
3622
        cluster_name = self.cfg.GetClusterName()
3623
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3624
                                           src_node, src_image, cluster_name):
3625
          raise errors.OpExecError("Could not import os for instance"
3626
                                   " %s on node %s" %
3627
                                   (instance, pnode_name))
3628
      else:
3629
        # also checked in the prereq part
3630
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3631
                                     % self.op.mode)
3632

    
3633
    if self.op.start:
3634
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3635
      feedback_fn("* starting instance...")
3636
      if not rpc.call_instance_start(pnode_name, iobj, None):
3637
        raise errors.OpExecError("Could not start instance")
3638

    
3639

    
3640
class LUConnectConsole(NoHooksLU):
3641
  """Connect to an instance's console.
3642

3643
  This is somewhat special in that it returns the command line that
3644
  you need to run on the master node in order to connect to the
3645
  console.
3646

3647
  """
3648
  _OP_REQP = ["instance_name"]
3649
  REQ_BGL = False
3650

    
3651
  def ExpandNames(self):
3652
    self._ExpandAndLockInstance()
3653

    
3654
  def CheckPrereq(self):
3655
    """Check prerequisites.
3656

3657
    This checks that the instance is in the cluster.
3658

3659
    """
3660
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3661
    assert self.instance is not None, \
3662
      "Cannot retrieve locked instance %s" % self.op.instance_name
3663

    
3664
  def Exec(self, feedback_fn):
3665
    """Connect to the console of an instance
3666

3667
    """
3668
    instance = self.instance
3669
    node = instance.primary_node
3670

    
3671
    node_insts = rpc.call_instance_list([node],
3672
                                        [instance.hypervisor])[node]
3673
    if node_insts is False:
3674
      raise errors.OpExecError("Can't connect to node %s." % node)
3675

    
3676
    if instance.name not in node_insts:
3677
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3678

    
3679
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3680

    
3681
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3682
    console_cmd = hyper.GetShellCommandForConsole(instance)
3683

    
3684
    # build ssh cmdline
3685
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3686

    
3687

    
3688
class LUReplaceDisks(LogicalUnit):
3689
  """Replace the disks of an instance.
3690

3691
  """
3692
  HPATH = "mirrors-replace"
3693
  HTYPE = constants.HTYPE_INSTANCE
3694
  _OP_REQP = ["instance_name", "mode", "disks"]
3695
  REQ_BGL = False
3696

    
3697
  def ExpandNames(self):
3698
    self._ExpandAndLockInstance()
3699

    
3700
    if not hasattr(self.op, "remote_node"):
3701
      self.op.remote_node = None
3702

    
3703
    ia_name = getattr(self.op, "iallocator", None)
3704
    if ia_name is not None:
3705
      if self.op.remote_node is not None:
3706
        raise errors.OpPrereqError("Give either the iallocator or the new"
3707
                                   " secondary, not both")
3708
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3709
    elif self.op.remote_node is not None:
3710
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3711
      if remote_node is None:
3712
        raise errors.OpPrereqError("Node '%s' not known" %
3713
                                   self.op.remote_node)
3714
      self.op.remote_node = remote_node
3715
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3716
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3717
    else:
3718
      self.needed_locks[locking.LEVEL_NODE] = []
3719
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3720

    
3721
  def DeclareLocks(self, level):
3722
    # If we're not already locking all nodes in the set we have to declare the
3723
    # instance's primary/secondary nodes.
3724
    if (level == locking.LEVEL_NODE and
3725
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3726
      self._LockInstancesNodes()
3727

    
3728
  def _RunAllocator(self):
3729
    """Compute a new secondary node using an IAllocator.
3730

3731
    """
3732
    ial = IAllocator(self.cfg,
3733
                     mode=constants.IALLOCATOR_MODE_RELOC,
3734
                     name=self.op.instance_name,
3735
                     relocate_from=[self.sec_node])
3736

    
3737
    ial.Run(self.op.iallocator)
3738

    
3739
    if not ial.success:
3740
      raise errors.OpPrereqError("Can't compute nodes using"
3741
                                 " iallocator '%s': %s" % (self.op.iallocator,
3742
                                                           ial.info))
3743
    if len(ial.nodes) != ial.required_nodes:
3744
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3745
                                 " of nodes (%s), required %s" %
3746
                                 (len(ial.nodes), ial.required_nodes))
3747
    self.op.remote_node = ial.nodes[0]
3748
    logger.ToStdout("Selected new secondary for the instance: %s" %
3749
                    self.op.remote_node)
3750

    
3751
  def BuildHooksEnv(self):
3752
    """Build hooks env.
3753

3754
    This runs on the master, the primary and all the secondaries.
3755

3756
    """
3757
    env = {
3758
      "MODE": self.op.mode,
3759
      "NEW_SECONDARY": self.op.remote_node,
3760
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3761
      }
3762
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3763
    nl = [
3764
      self.cfg.GetMasterNode(),
3765
      self.instance.primary_node,
3766
      ]
3767
    if self.op.remote_node is not None:
3768
      nl.append(self.op.remote_node)
3769
    return env, nl, nl
3770

    
3771
  def CheckPrereq(self):
3772
    """Check prerequisites.
3773

3774
    This checks that the instance is in the cluster.
3775

3776
    """
3777
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3778
    assert instance is not None, \
3779
      "Cannot retrieve locked instance %s" % self.op.instance_name
3780
    self.instance = instance
3781

    
3782
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3783
      raise errors.OpPrereqError("Instance's disk layout is not"
3784
                                 " network mirrored.")
3785

    
3786
    if len(instance.secondary_nodes) != 1:
3787
      raise errors.OpPrereqError("The instance has a strange layout,"
3788
                                 " expected one secondary but found %d" %
3789
                                 len(instance.secondary_nodes))
3790

    
3791
    self.sec_node = instance.secondary_nodes[0]
3792

    
3793
    ia_name = getattr(self.op, "iallocator", None)
3794
    if ia_name is not None:
3795
      self._RunAllocator()
3796

    
3797
    remote_node = self.op.remote_node
3798
    if remote_node is not None:
3799
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3800
      assert self.remote_node_info is not None, \
3801
        "Cannot retrieve locked node %s" % remote_node
3802
    else:
3803
      self.remote_node_info = None
3804
    if remote_node == instance.primary_node:
3805
      raise errors.OpPrereqError("The specified node is the primary node of"
3806
                                 " the instance.")
3807
    elif remote_node == self.sec_node:
3808
      if self.op.mode == constants.REPLACE_DISK_SEC:
3809
        # this is for DRBD8, where we can't execute the same mode of
3810
        # replacement as for drbd7 (no different port allocated)
3811
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3812
                                   " replacement")
3813
    if instance.disk_template == constants.DT_DRBD8:
3814
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3815
          remote_node is not None):
3816
        # switch to replace secondary mode
3817
        self.op.mode = constants.REPLACE_DISK_SEC
3818

    
3819
      if self.op.mode == constants.REPLACE_DISK_ALL:
3820
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3821
                                   " secondary disk replacement, not"
3822
                                   " both at once")
3823
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3824
        if remote_node is not None:
3825
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3826
                                     " the secondary while doing a primary"
3827
                                     " node disk replacement")
3828
        self.tgt_node = instance.primary_node
3829
        self.oth_node = instance.secondary_nodes[0]
3830
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3831
        self.new_node = remote_node # this can be None, in which case
3832
                                    # we don't change the secondary
3833
        self.tgt_node = instance.secondary_nodes[0]
3834
        self.oth_node = instance.primary_node
3835
      else:
3836
        raise errors.ProgrammerError("Unhandled disk replace mode")
3837

    
3838
    for name in self.op.disks:
3839
      if instance.FindDisk(name) is None:
3840
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3841
                                   (name, instance.name))
3842

    
3843
  def _ExecD8DiskOnly(self, feedback_fn):
3844
    """Replace a disk on the primary or secondary for dbrd8.
3845

3846
    The algorithm for replace is quite complicated:
3847
      - for each disk to be replaced:
3848
        - create new LVs on the target node with unique names
3849
        - detach old LVs from the drbd device
3850
        - rename old LVs to name_replaced.<time_t>
3851
        - rename new LVs to old LVs
3852
        - attach the new LVs (with the old names now) to the drbd device
3853
      - wait for sync across all devices
3854
      - for each modified disk:
3855
        - remove old LVs (which have the name name_replaces.<time_t>)
3856

3857
    Failures are not very well handled.
3858

3859
    """
3860
    steps_total = 6
3861
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3862
    instance = self.instance
3863
    iv_names = {}
3864
    vgname = self.cfg.GetVGName()
3865
    # start of work
3866
    cfg = self.cfg
3867
    tgt_node = self.tgt_node
3868
    oth_node = self.oth_node
3869

    
3870
    # Step: check device activation
3871
    self.proc.LogStep(1, steps_total, "check device existence")
3872
    info("checking volume groups")
3873
    my_vg = cfg.GetVGName()
3874
    results = rpc.call_vg_list([oth_node, tgt_node])
3875
    if not results:
3876
      raise errors.OpExecError("Can't list volume groups on the nodes")
3877
    for node in oth_node, tgt_node:
3878
      res = results.get(node, False)
3879
      if not res or my_vg not in res:
3880
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3881
                                 (my_vg, node))
3882
    for dev in instance.disks:
3883
      if not dev.iv_name in self.op.disks:
3884
        continue
3885
      for node in tgt_node, oth_node:
3886
        info("checking %s on %s" % (dev.iv_name, node))
3887
        cfg.SetDiskID(dev, node)
3888
        if not rpc.call_blockdev_find(node, dev):
3889
          raise errors.OpExecError("Can't find device %s on node %s" %
3890
                                   (dev.iv_name, node))
3891

    
3892
    # Step: check other node consistency
3893
    self.proc.LogStep(2, steps_total, "check peer consistency")
3894
    for dev in instance.disks:
3895
      if not dev.iv_name in self.op.disks:
3896
        continue
3897
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3898
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3899
                                   oth_node==instance.primary_node):
3900
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3901
                                 " to replace disks on this node (%s)" %
3902
                                 (oth_node, tgt_node))
3903

    
3904
    # Step: create new storage
3905
    self.proc.LogStep(3, steps_total, "allocate new storage")
3906
    for dev in instance.disks:
3907
      if not dev.iv_name in self.op.disks:
3908
        continue
3909
      size = dev.size
3910
      cfg.SetDiskID(dev, tgt_node)
3911
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3912
      names = _GenerateUniqueNames(cfg, lv_names)
3913
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3914
                             logical_id=(vgname, names[0]))
3915
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3916
                             logical_id=(vgname, names[1]))
3917
      new_lvs = [lv_data, lv_meta]
3918
      old_lvs = dev.children
3919
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3920
      info("creating new local storage on %s for %s" %
3921
           (tgt_node, dev.iv_name))
3922
      # since we *always* want to create this LV, we use the
3923
      # _Create...OnPrimary (which forces the creation), even if we
3924
      # are talking about the secondary node
3925
      for new_lv in new_lvs:
3926
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3927
                                        _GetInstanceInfoText(instance)):
3928
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3929
                                   " node '%s'" %
3930
                                   (new_lv.logical_id[1], tgt_node))
3931

    
3932
    # Step: for each lv, detach+rename*2+attach
3933
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3934
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3935
      info("detaching %s drbd from local storage" % dev.iv_name)
3936
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3937
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3938
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3939
      #dev.children = []
3940
      #cfg.Update(instance)
3941

    
3942
      # ok, we created the new LVs, so now we know we have the needed
3943
      # storage; as such, we proceed on the target node to rename
3944
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3945
      # using the assumption that logical_id == physical_id (which in
3946
      # turn is the unique_id on that node)
3947

    
3948
      # FIXME(iustin): use a better name for the replaced LVs
3949
      temp_suffix = int(time.time())
3950
      ren_fn = lambda d, suff: (d.physical_id[0],
3951
                                d.physical_id[1] + "_replaced-%s" % suff)
3952
      # build the rename list based on what LVs exist on the node
3953
      rlist = []
3954
      for to_ren in old_lvs:
3955
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3956
        if find_res is not None: # device exists
3957
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3958

    
3959
      info("renaming the old LVs on the target node")
3960
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3961
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3962
      # now we rename the new LVs to the old LVs
3963
      info("renaming the new LVs on the target node")
3964
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3965
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3966
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3967

    
3968
      for old, new in zip(old_lvs, new_lvs):
3969
        new.logical_id = old.logical_id
3970
        cfg.SetDiskID(new, tgt_node)
3971

    
3972
      for disk in old_lvs:
3973
        disk.logical_id = ren_fn(disk, temp_suffix)
3974
        cfg.SetDiskID(disk, tgt_node)
3975

    
3976
      # now that the new lvs have the old name, we can add them to the device
3977
      info("adding new mirror component on %s" % tgt_node)
3978
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3979
        for new_lv in new_lvs:
3980
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3981
            warning("Can't rollback device %s", hint=