Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ d6a02168

History | View | Annotate | Download (188.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34

    
35
from ganeti import rpc
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_WSSTORE: the LU needs a writable SimpleStore
60
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
61

62
  Note that all commands require root permissions.
63

64
  """
65
  HPATH = None
66
  HTYPE = None
67
  _OP_REQP = []
68
  REQ_MASTER = True
69
  REQ_WSSTORE = False
70
  REQ_BGL = True
71

    
72
  def __init__(self, processor, op, context, sstore):
73
    """Constructor for LogicalUnit.
74

75
    This needs to be overriden in derived classes in order to check op
76
    validity.
77

78
    """
79
    self.proc = processor
80
    self.op = op
81
    self.cfg = context.cfg
82
    self.sstore = sstore
83
    self.context = context
84
    # Dicts used to declare locking needs to mcpu
85
    self.needed_locks = None
86
    self.acquired_locks = {}
87
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
88
    self.add_locks = {}
89
    self.remove_locks = {}
90
    # Used to force good behavior when calling helper functions
91
    self.recalculate_locks = {}
92
    self.__ssh = None
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99

    
100
    if not self.cfg.IsCluster():
101
      raise errors.OpPrereqError("Cluster not initialized yet,"
102
                                 " use 'gnt-cluster init' first.")
103
    if self.REQ_MASTER:
104
      master = self.cfg.GetMasterNode()
105
      if master != utils.HostInfo().name:
106
        raise errors.OpPrereqError("Commands must be run on the master"
107
                                   " node %s" % master)
108

    
109
  def __GetSSH(self):
110
    """Returns the SshRunner object
111

112
    """
113
    if not self.__ssh:
114
      self.__ssh = ssh.SshRunner(self.cfg)
115
    return self.__ssh
116

    
117
  ssh = property(fget=__GetSSH)
118

    
119
  def ExpandNames(self):
120
    """Expand names for this LU.
121

122
    This method is called before starting to execute the opcode, and it should
123
    update all the parameters of the opcode to their canonical form (e.g. a
124
    short node name must be fully expanded after this method has successfully
125
    completed). This way locking, hooks, logging, ecc. can work correctly.
126

127
    LUs which implement this method must also populate the self.needed_locks
128
    member, as a dict with lock levels as keys, and a list of needed lock names
129
    as values. Rules:
130
      - Use an empty dict if you don't need any lock
131
      - If you don't need any lock at a particular level omit that level
132
      - Don't put anything for the BGL level
133
      - If you want all locks at a level use locking.ALL_SET as a value
134

135
    If you need to share locks (rather than acquire them exclusively) at one
136
    level you can modify self.share_locks, setting a true value (usually 1) for
137
    that level. By default locks are not shared.
138

139
    Examples:
140
    # Acquire all nodes and one instance
141
    self.needed_locks = {
142
      locking.LEVEL_NODE: locking.ALL_SET,
143
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
144
    }
145
    # Acquire just two nodes
146
    self.needed_locks = {
147
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
148
    }
149
    # Acquire no locks
150
    self.needed_locks = {} # No, you can't leave it to the default value None
151

152
    """
153
    # The implementation of this method is mandatory only if the new LU is
154
    # concurrent, so that old LUs don't need to be changed all at the same
155
    # time.
156
    if self.REQ_BGL:
157
      self.needed_locks = {} # Exclusive LUs don't need locks.
158
    else:
159
      raise NotImplementedError
160

    
161
  def DeclareLocks(self, level):
162
    """Declare LU locking needs for a level
163

164
    While most LUs can just declare their locking needs at ExpandNames time,
165
    sometimes there's the need to calculate some locks after having acquired
166
    the ones before. This function is called just before acquiring locks at a
167
    particular level, but after acquiring the ones at lower levels, and permits
168
    such calculations. It can be used to modify self.needed_locks, and by
169
    default it does nothing.
170

171
    This function is only called if you have something already set in
172
    self.needed_locks for the level.
173

174
    @param level: Locking level which is going to be locked
175
    @type level: member of ganeti.locking.LEVELS
176

177
    """
178

    
179
  def CheckPrereq(self):
180
    """Check prerequisites for this LU.
181

182
    This method should check that the prerequisites for the execution
183
    of this LU are fulfilled. It can do internode communication, but
184
    it should be idempotent - no cluster or system changes are
185
    allowed.
186

187
    The method should raise errors.OpPrereqError in case something is
188
    not fulfilled. Its return value is ignored.
189

190
    This method should also update all the parameters of the opcode to
191
    their canonical form if it hasn't been done by ExpandNames before.
192

193
    """
194
    raise NotImplementedError
195

    
196
  def Exec(self, feedback_fn):
197
    """Execute the LU.
198

199
    This method should implement the actual work. It should raise
200
    errors.OpExecError for failures that are somewhat dealt with in
201
    code, or expected.
202

203
    """
204
    raise NotImplementedError
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks environment for this LU.
208

209
    This method should return a three-node tuple consisting of: a dict
210
    containing the environment that will be used for running the
211
    specific hook for this LU, a list of node names on which the hook
212
    should run before the execution, and a list of node names on which
213
    the hook should run after the execution.
214

215
    The keys of the dict must not have 'GANETI_' prefixed as this will
216
    be handled in the hooks runner. Also note additional keys will be
217
    added by the hooks runner. If the LU doesn't define any
218
    environment, an empty dict (and not None) should be returned.
219

220
    No nodes should be returned as an empty list (and not None).
221

222
    Note that if the HPATH for a LU class is None, this function will
223
    not be called.
224

225
    """
226
    raise NotImplementedError
227

    
228
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
229
    """Notify the LU about the results of its hooks.
230

231
    This method is called every time a hooks phase is executed, and notifies
232
    the Logical Unit about the hooks' result. The LU can then use it to alter
233
    its result based on the hooks.  By default the method does nothing and the
234
    previous result is passed back unchanged but any LU can define it if it
235
    wants to use the local cluster hook-scripts somehow.
236

237
    Args:
238
      phase: the hooks phase that has just been run
239
      hooks_results: the results of the multi-node hooks rpc call
240
      feedback_fn: function to send feedback back to the caller
241
      lu_result: the previous result this LU had, or None in the PRE phase.
242

243
    """
244
    return lu_result
245

    
246
  def _ExpandAndLockInstance(self):
247
    """Helper function to expand and lock an instance.
248

249
    Many LUs that work on an instance take its name in self.op.instance_name
250
    and need to expand it and then declare the expanded name for locking. This
251
    function does it, and then updates self.op.instance_name to the expanded
252
    name. It also initializes needed_locks as a dict, if this hasn't been done
253
    before.
254

255
    """
256
    if self.needed_locks is None:
257
      self.needed_locks = {}
258
    else:
259
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
260
        "_ExpandAndLockInstance called with instance-level locks set"
261
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
262
    if expanded_name is None:
263
      raise errors.OpPrereqError("Instance '%s' not known" %
264
                                  self.op.instance_name)
265
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
266
    self.op.instance_name = expanded_name
267

    
268
  def _LockInstancesNodes(self, primary_only=False):
269
    """Helper function to declare instances' nodes for locking.
270

271
    This function should be called after locking one or more instances to lock
272
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
273
    with all primary or secondary nodes for instances already locked and
274
    present in self.needed_locks[locking.LEVEL_INSTANCE].
275

276
    It should be called from DeclareLocks, and for safety only works if
277
    self.recalculate_locks[locking.LEVEL_NODE] is set.
278

279
    In the future it may grow parameters to just lock some instance's nodes, or
280
    to just lock primaries or secondary nodes, if needed.
281

282
    If should be called in DeclareLocks in a way similar to:
283

284
    if level == locking.LEVEL_NODE:
285
      self._LockInstancesNodes()
286

287
    @type primary_only: boolean
288
    @param primary_only: only lock primary nodes of locked instances
289

290
    """
291
    assert locking.LEVEL_NODE in self.recalculate_locks, \
292
      "_LockInstancesNodes helper function called with no nodes to recalculate"
293

    
294
    # TODO: check if we're really been called with the instance locks held
295

    
296
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
297
    # future we might want to have different behaviors depending on the value
298
    # of self.recalculate_locks[locking.LEVEL_NODE]
299
    wanted_nodes = []
300
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
301
      instance = self.context.cfg.GetInstanceInfo(instance_name)
302
      wanted_nodes.append(instance.primary_node)
303
      if not primary_only:
304
        wanted_nodes.extend(instance.secondary_nodes)
305

    
306
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
307
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
308
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
309
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
310

    
311
    del self.recalculate_locks[locking.LEVEL_NODE]
312

    
313

    
314
class NoHooksLU(LogicalUnit):
315
  """Simple LU which runs no hooks.
316

317
  This LU is intended as a parent for other LogicalUnits which will
318
  run no hooks, in order to reduce duplicate code.
319

320
  """
321
  HPATH = None
322
  HTYPE = None
323

    
324

    
325
def _GetWantedNodes(lu, nodes):
326
  """Returns list of checked and expanded node names.
327

328
  Args:
329
    nodes: List of nodes (strings) or None for all
330

331
  """
332
  if not isinstance(nodes, list):
333
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
334

    
335
  if not nodes:
336
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
337
      " non-empty list of nodes whose name is to be expanded.")
338

    
339
  wanted = []
340
  for name in nodes:
341
    node = lu.cfg.ExpandNodeName(name)
342
    if node is None:
343
      raise errors.OpPrereqError("No such node name '%s'" % name)
344
    wanted.append(node)
345

    
346
  return utils.NiceSort(wanted)
347

    
348

    
349
def _GetWantedInstances(lu, instances):
350
  """Returns list of checked and expanded instance names.
351

352
  Args:
353
    instances: List of instances (strings) or None for all
354

355
  """
356
  if not isinstance(instances, list):
357
    raise errors.OpPrereqError("Invalid argument type 'instances'")
358

    
359
  if instances:
360
    wanted = []
361

    
362
    for name in instances:
363
      instance = lu.cfg.ExpandInstanceName(name)
364
      if instance is None:
365
        raise errors.OpPrereqError("No such instance name '%s'" % name)
366
      wanted.append(instance)
367

    
368
  else:
369
    wanted = lu.cfg.GetInstanceList()
370
  return utils.NiceSort(wanted)
371

    
372

    
373
def _CheckOutputFields(static, dynamic, selected):
374
  """Checks whether all selected fields are valid.
375

376
  Args:
377
    static: Static fields
378
    dynamic: Dynamic fields
379

380
  """
381
  static_fields = frozenset(static)
382
  dynamic_fields = frozenset(dynamic)
383

    
384
  all_fields = static_fields | dynamic_fields
385

    
386
  if not all_fields.issuperset(selected):
387
    raise errors.OpPrereqError("Unknown output fields selected: %s"
388
                               % ",".join(frozenset(selected).
389
                                          difference(all_fields)))
390

    
391

    
392
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
393
                          memory, vcpus, nics):
394
  """Builds instance related env variables for hooks from single variables.
395

396
  Args:
397
    secondary_nodes: List of secondary nodes as strings
398
  """
399
  env = {
400
    "OP_TARGET": name,
401
    "INSTANCE_NAME": name,
402
    "INSTANCE_PRIMARY": primary_node,
403
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
404
    "INSTANCE_OS_TYPE": os_type,
405
    "INSTANCE_STATUS": status,
406
    "INSTANCE_MEMORY": memory,
407
    "INSTANCE_VCPUS": vcpus,
408
  }
409

    
410
  if nics:
411
    nic_count = len(nics)
412
    for idx, (ip, bridge, mac) in enumerate(nics):
413
      if ip is None:
414
        ip = ""
415
      env["INSTANCE_NIC%d_IP" % idx] = ip
416
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
417
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
418
  else:
419
    nic_count = 0
420

    
421
  env["INSTANCE_NIC_COUNT"] = nic_count
422

    
423
  return env
424

    
425

    
426
def _BuildInstanceHookEnvByObject(instance, override=None):
427
  """Builds instance related env variables for hooks from an object.
428

429
  Args:
430
    instance: objects.Instance object of instance
431
    override: dict of values to override
432
  """
433
  args = {
434
    'name': instance.name,
435
    'primary_node': instance.primary_node,
436
    'secondary_nodes': instance.secondary_nodes,
437
    'os_type': instance.os,
438
    'status': instance.os,
439
    'memory': instance.memory,
440
    'vcpus': instance.vcpus,
441
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
442
  }
443
  if override:
444
    args.update(override)
445
  return _BuildInstanceHookEnv(**args)
446

    
447

    
448
def _CheckInstanceBridgesExist(instance):
449
  """Check that the brigdes needed by an instance exist.
450

451
  """
452
  # check bridges existance
453
  brlist = [nic.bridge for nic in instance.nics]
454
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
455
    raise errors.OpPrereqError("one or more target bridges %s does not"
456
                               " exist on destination node '%s'" %
457
                               (brlist, instance.primary_node))
458

    
459

    
460
class LUDestroyCluster(NoHooksLU):
461
  """Logical unit for destroying the cluster.
462

463
  """
464
  _OP_REQP = []
465

    
466
  def CheckPrereq(self):
467
    """Check prerequisites.
468

469
    This checks whether the cluster is empty.
470

471
    Any errors are signalled by raising errors.OpPrereqError.
472

473
    """
474
    master = self.cfg.GetMasterNode()
475

    
476
    nodelist = self.cfg.GetNodeList()
477
    if len(nodelist) != 1 or nodelist[0] != master:
478
      raise errors.OpPrereqError("There are still %d node(s) in"
479
                                 " this cluster." % (len(nodelist) - 1))
480
    instancelist = self.cfg.GetInstanceList()
481
    if instancelist:
482
      raise errors.OpPrereqError("There are still %d instance(s) in"
483
                                 " this cluster." % len(instancelist))
484

    
485
  def Exec(self, feedback_fn):
486
    """Destroys the cluster.
487

488
    """
489
    master = self.cfg.GetMasterNode()
490
    if not rpc.call_node_stop_master(master, False):
491
      raise errors.OpExecError("Could not disable the master role")
492
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
493
    utils.CreateBackup(priv_key)
494
    utils.CreateBackup(pub_key)
495
    return master
496

    
497

    
498
class LUVerifyCluster(LogicalUnit):
499
  """Verifies the cluster status.
500

501
  """
502
  HPATH = "cluster-verify"
503
  HTYPE = constants.HTYPE_CLUSTER
504
  _OP_REQP = ["skip_checks"]
505
  REQ_BGL = False
506

    
507
  def ExpandNames(self):
508
    self.needed_locks = {
509
      locking.LEVEL_NODE: locking.ALL_SET,
510
      locking.LEVEL_INSTANCE: locking.ALL_SET,
511
    }
512
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
513

    
514
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
515
                  remote_version, feedback_fn):
516
    """Run multiple tests against a node.
517

518
    Test list:
519
      - compares ganeti version
520
      - checks vg existance and size > 20G
521
      - checks config file checksum
522
      - checks ssh to other nodes
523

524
    Args:
525
      node: name of the node to check
526
      file_list: required list of files
527
      local_cksum: dictionary of local files and their checksums
528

529
    """
530
    # compares ganeti version
531
    local_version = constants.PROTOCOL_VERSION
532
    if not remote_version:
533
      feedback_fn("  - ERROR: connection to %s failed" % (node))
534
      return True
535

    
536
    if local_version != remote_version:
537
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
538
                      (local_version, node, remote_version))
539
      return True
540

    
541
    # checks vg existance and size > 20G
542

    
543
    bad = False
544
    if not vglist:
545
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
546
                      (node,))
547
      bad = True
548
    else:
549
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
550
                                            constants.MIN_VG_SIZE)
551
      if vgstatus:
552
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
553
        bad = True
554

    
555
    # checks config file checksum
556
    # checks ssh to any
557

    
558
    if 'filelist' not in node_result:
559
      bad = True
560
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
561
    else:
562
      remote_cksum = node_result['filelist']
563
      for file_name in file_list:
564
        if file_name not in remote_cksum:
565
          bad = True
566
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
567
        elif remote_cksum[file_name] != local_cksum[file_name]:
568
          bad = True
569
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
570

    
571
    if 'nodelist' not in node_result:
572
      bad = True
573
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
574
    else:
575
      if node_result['nodelist']:
576
        bad = True
577
        for node in node_result['nodelist']:
578
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
579
                          (node, node_result['nodelist'][node]))
580
    if 'node-net-test' not in node_result:
581
      bad = True
582
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
583
    else:
584
      if node_result['node-net-test']:
585
        bad = True
586
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
587
        for node in nlist:
588
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
589
                          (node, node_result['node-net-test'][node]))
590

    
591
    hyp_result = node_result.get('hypervisor', None)
592
    if hyp_result is not None:
593
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
594
    return bad
595

    
596
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597
                      node_instance, feedback_fn):
598
    """Verify an instance.
599

600
    This function checks to see if the required block devices are
601
    available on the instance's node.
602

603
    """
604
    bad = False
605

    
606
    node_current = instanceconfig.primary_node
607

    
608
    node_vol_should = {}
609
    instanceconfig.MapLVsByNode(node_vol_should)
610

    
611
    for node in node_vol_should:
612
      for volume in node_vol_should[node]:
613
        if node not in node_vol_is or volume not in node_vol_is[node]:
614
          feedback_fn("  - ERROR: volume %s missing on node %s" %
615
                          (volume, node))
616
          bad = True
617

    
618
    if not instanceconfig.status == 'down':
619
      if (node_current not in node_instance or
620
          not instance in node_instance[node_current]):
621
        feedback_fn("  - ERROR: instance %s not running on node %s" %
622
                        (instance, node_current))
623
        bad = True
624

    
625
    for node in node_instance:
626
      if (not node == node_current):
627
        if instance in node_instance[node]:
628
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
629
                          (instance, node))
630
          bad = True
631

    
632
    return bad
633

    
634
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635
    """Verify if there are any unknown volumes in the cluster.
636

637
    The .os, .swap and backup volumes are ignored. All other volumes are
638
    reported as unknown.
639

640
    """
641
    bad = False
642

    
643
    for node in node_vol_is:
644
      for volume in node_vol_is[node]:
645
        if node not in node_vol_should or volume not in node_vol_should[node]:
646
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647
                      (volume, node))
648
          bad = True
649
    return bad
650

    
651
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652
    """Verify the list of running instances.
653

654
    This checks what instances are running but unknown to the cluster.
655

656
    """
657
    bad = False
658
    for node in node_instance:
659
      for runninginstance in node_instance[node]:
660
        if runninginstance not in instancelist:
661
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662
                          (runninginstance, node))
663
          bad = True
664
    return bad
665

    
666
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667
    """Verify N+1 Memory Resilience.
668

669
    Check that if one single node dies we can still start all the instances it
670
    was primary for.
671

672
    """
673
    bad = False
674

    
675
    for node, nodeinfo in node_info.iteritems():
676
      # This code checks that every node which is now listed as secondary has
677
      # enough memory to host all instances it is supposed to should a single
678
      # other node in the cluster fail.
679
      # FIXME: not ready for failover to an arbitrary node
680
      # FIXME: does not support file-backed instances
681
      # WARNING: we currently take into account down instances as well as up
682
      # ones, considering that even if they're down someone might want to start
683
      # them even in the event of a node failure.
684
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685
        needed_mem = 0
686
        for instance in instances:
687
          needed_mem += instance_cfg[instance].memory
688
        if nodeinfo['mfree'] < needed_mem:
689
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690
                      " failovers should node %s fail" % (node, prinode))
691
          bad = True
692
    return bad
693

    
694
  def CheckPrereq(self):
695
    """Check prerequisites.
696

697
    Transform the list of checks we're going to skip into a set and check that
698
    all its members are valid.
699

700
    """
701
    self.skip_set = frozenset(self.op.skip_checks)
702
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
704

    
705
  def BuildHooksEnv(self):
706
    """Build hooks env.
707

708
    Cluster-Verify hooks just rone in the post phase and their failure makes
709
    the output be logged in the verify output and the verification to fail.
710

711
    """
712
    all_nodes = self.cfg.GetNodeList()
713
    # TODO: populate the environment with useful information for verify hooks
714
    env = {}
715
    return env, [], all_nodes
716

    
717
  def Exec(self, feedback_fn):
718
    """Verify integrity of cluster, performing various test on nodes.
719

720
    """
721
    bad = False
722
    feedback_fn("* Verifying global settings")
723
    for msg in self.cfg.VerifyConfig():
724
      feedback_fn("  - ERROR: %s" % msg)
725

    
726
    vg_name = self.cfg.GetVGName()
727
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
728
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
729
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
730
    i_non_redundant = [] # Non redundant instances
731
    node_volume = {}
732
    node_instance = {}
733
    node_info = {}
734
    instance_cfg = {}
735

    
736
    # FIXME: verify OS list
737
    # do local checksums
738
    file_names = []
739
    file_names.append(constants.SSL_CERT_FILE)
740
    file_names.append(constants.CLUSTER_CONF_FILE)
741
    local_checksums = utils.FingerprintFiles(file_names)
742

    
743
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
744
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
745
    all_instanceinfo = rpc.call_instance_list(nodelist)
746
    all_vglist = rpc.call_vg_list(nodelist)
747
    node_verify_param = {
748
      'filelist': file_names,
749
      'nodelist': nodelist,
750
      'hypervisor': None,
751
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
752
                        for node in nodeinfo]
753
      }
754
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
755
    all_rversion = rpc.call_version(nodelist)
756
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
757

    
758
    for node in nodelist:
759
      feedback_fn("* Verifying node %s" % node)
760
      result = self._VerifyNode(node, file_names, local_checksums,
761
                                all_vglist[node], all_nvinfo[node],
762
                                all_rversion[node], feedback_fn)
763
      bad = bad or result
764

    
765
      # node_volume
766
      volumeinfo = all_volumeinfo[node]
767

    
768
      if isinstance(volumeinfo, basestring):
769
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
770
                    (node, volumeinfo[-400:].encode('string_escape')))
771
        bad = True
772
        node_volume[node] = {}
773
      elif not isinstance(volumeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777
      else:
778
        node_volume[node] = volumeinfo
779

    
780
      # node_instance
781
      nodeinstance = all_instanceinfo[node]
782
      if type(nodeinstance) != list:
783
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
784
        bad = True
785
        continue
786

    
787
      node_instance[node] = nodeinstance
788

    
789
      # node_info
790
      nodeinfo = all_ninfo[node]
791
      if not isinstance(nodeinfo, dict):
792
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
793
        bad = True
794
        continue
795

    
796
      try:
797
        node_info[node] = {
798
          "mfree": int(nodeinfo['memory_free']),
799
          "dfree": int(nodeinfo['vg_free']),
800
          "pinst": [],
801
          "sinst": [],
802
          # dictionary holding all instances this node is secondary for,
803
          # grouped by their primary node. Each key is a cluster node, and each
804
          # value is a list of instances which have the key as primary and the
805
          # current node as secondary.  this is handy to calculate N+1 memory
806
          # availability if you can only failover from a primary to its
807
          # secondary.
808
          "sinst-by-pnode": {},
809
        }
810
      except ValueError:
811
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
812
        bad = True
813
        continue
814

    
815
    node_vol_should = {}
816

    
817
    for instance in instancelist:
818
      feedback_fn("* Verifying instance %s" % instance)
819
      inst_config = self.cfg.GetInstanceInfo(instance)
820
      result =  self._VerifyInstance(instance, inst_config, node_volume,
821
                                     node_instance, feedback_fn)
822
      bad = bad or result
823

    
824
      inst_config.MapLVsByNode(node_vol_should)
825

    
826
      instance_cfg[instance] = inst_config
827

    
828
      pnode = inst_config.primary_node
829
      if pnode in node_info:
830
        node_info[pnode]['pinst'].append(instance)
831
      else:
832
        feedback_fn("  - ERROR: instance %s, connection to primary node"
833
                    " %s failed" % (instance, pnode))
834
        bad = True
835

    
836
      # If the instance is non-redundant we cannot survive losing its primary
837
      # node, so we are not N+1 compliant. On the other hand we have no disk
838
      # templates with more than one secondary so that situation is not well
839
      # supported either.
840
      # FIXME: does not support file-backed instances
841
      if len(inst_config.secondary_nodes) == 0:
842
        i_non_redundant.append(instance)
843
      elif len(inst_config.secondary_nodes) > 1:
844
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
845
                    % instance)
846

    
847
      for snode in inst_config.secondary_nodes:
848
        if snode in node_info:
849
          node_info[snode]['sinst'].append(instance)
850
          if pnode not in node_info[snode]['sinst-by-pnode']:
851
            node_info[snode]['sinst-by-pnode'][pnode] = []
852
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
853
        else:
854
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
855
                      " %s failed" % (instance, snode))
856

    
857
    feedback_fn("* Verifying orphan volumes")
858
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
859
                                       feedback_fn)
860
    bad = bad or result
861

    
862
    feedback_fn("* Verifying remaining instances")
863
    result = self._VerifyOrphanInstances(instancelist, node_instance,
864
                                         feedback_fn)
865
    bad = bad or result
866

    
867
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
868
      feedback_fn("* Verifying N+1 Memory redundancy")
869
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
870
      bad = bad or result
871

    
872
    feedback_fn("* Other Notes")
873
    if i_non_redundant:
874
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
875
                  % len(i_non_redundant))
876

    
877
    return not bad
878

    
879
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
880
    """Analize the post-hooks' result, handle it, and send some
881
    nicely-formatted feedback back to the user.
882

883
    Args:
884
      phase: the hooks phase that has just been run
885
      hooks_results: the results of the multi-node hooks rpc call
886
      feedback_fn: function to send feedback back to the caller
887
      lu_result: previous Exec result
888

889
    """
890
    # We only really run POST phase hooks, and are only interested in
891
    # their results
892
    if phase == constants.HOOKS_PHASE_POST:
893
      # Used to change hooks' output to proper indentation
894
      indent_re = re.compile('^', re.M)
895
      feedback_fn("* Hooks Results")
896
      if not hooks_results:
897
        feedback_fn("  - ERROR: general communication failure")
898
        lu_result = 1
899
      else:
900
        for node_name in hooks_results:
901
          show_node_header = True
902
          res = hooks_results[node_name]
903
          if res is False or not isinstance(res, list):
904
            feedback_fn("    Communication failure")
905
            lu_result = 1
906
            continue
907
          for script, hkr, output in res:
908
            if hkr == constants.HKR_FAIL:
909
              # The node header is only shown once, if there are
910
              # failing hooks on that node
911
              if show_node_header:
912
                feedback_fn("  Node %s:" % node_name)
913
                show_node_header = False
914
              feedback_fn("    ERROR: Script %s failed, output:" % script)
915
              output = indent_re.sub('      ', output)
916
              feedback_fn("%s" % output)
917
              lu_result = 1
918

    
919
      return lu_result
920

    
921

    
922
class LUVerifyDisks(NoHooksLU):
923
  """Verifies the cluster disks status.
924

925
  """
926
  _OP_REQP = []
927
  REQ_BGL = False
928

    
929
  def ExpandNames(self):
930
    self.needed_locks = {
931
      locking.LEVEL_NODE: locking.ALL_SET,
932
      locking.LEVEL_INSTANCE: locking.ALL_SET,
933
    }
934
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
935

    
936
  def CheckPrereq(self):
937
    """Check prerequisites.
938

939
    This has no prerequisites.
940

941
    """
942
    pass
943

    
944
  def Exec(self, feedback_fn):
945
    """Verify integrity of cluster disks.
946

947
    """
948
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
949

    
950
    vg_name = self.cfg.GetVGName()
951
    nodes = utils.NiceSort(self.cfg.GetNodeList())
952
    instances = [self.cfg.GetInstanceInfo(name)
953
                 for name in self.cfg.GetInstanceList()]
954

    
955
    nv_dict = {}
956
    for inst in instances:
957
      inst_lvs = {}
958
      if (inst.status != "up" or
959
          inst.disk_template not in constants.DTS_NET_MIRROR):
960
        continue
961
      inst.MapLVsByNode(inst_lvs)
962
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
963
      for node, vol_list in inst_lvs.iteritems():
964
        for vol in vol_list:
965
          nv_dict[(node, vol)] = inst
966

    
967
    if not nv_dict:
968
      return result
969

    
970
    node_lvs = rpc.call_volume_list(nodes, vg_name)
971

    
972
    to_act = set()
973
    for node in nodes:
974
      # node_volume
975
      lvs = node_lvs[node]
976

    
977
      if isinstance(lvs, basestring):
978
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
979
        res_nlvm[node] = lvs
980
      elif not isinstance(lvs, dict):
981
        logger.Info("connection to node %s failed or invalid data returned" %
982
                    (node,))
983
        res_nodes.append(node)
984
        continue
985

    
986
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
987
        inst = nv_dict.pop((node, lv_name), None)
988
        if (not lv_online and inst is not None
989
            and inst.name not in res_instances):
990
          res_instances.append(inst.name)
991

    
992
    # any leftover items in nv_dict are missing LVs, let's arrange the
993
    # data better
994
    for key, inst in nv_dict.iteritems():
995
      if inst.name not in res_missing:
996
        res_missing[inst.name] = []
997
      res_missing[inst.name].append(key)
998

    
999
    return result
1000

    
1001

    
1002
class LURenameCluster(LogicalUnit):
1003
  """Rename the cluster.
1004

1005
  """
1006
  HPATH = "cluster-rename"
1007
  HTYPE = constants.HTYPE_CLUSTER
1008
  _OP_REQP = ["name"]
1009

    
1010
  def BuildHooksEnv(self):
1011
    """Build hooks env.
1012

1013
    """
1014
    env = {
1015
      "OP_TARGET": self.cfg.GetClusterName(),
1016
      "NEW_NAME": self.op.name,
1017
      }
1018
    mn = self.cfg.GetMasterNode()
1019
    return env, [mn], [mn]
1020

    
1021
  def CheckPrereq(self):
1022
    """Verify that the passed name is a valid one.
1023

1024
    """
1025
    hostname = utils.HostInfo(self.op.name)
1026

    
1027
    new_name = hostname.name
1028
    self.ip = new_ip = hostname.ip
1029
    old_name = self.cfg.GetClusterName()
1030
    old_ip = self.cfg.GetMasterIP()
1031
    if new_name == old_name and new_ip == old_ip:
1032
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1033
                                 " cluster has changed")
1034
    if new_ip != old_ip:
1035
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1036
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1037
                                   " reachable on the network. Aborting." %
1038
                                   new_ip)
1039

    
1040
    self.op.name = new_name
1041

    
1042
  def Exec(self, feedback_fn):
1043
    """Rename the cluster.
1044

1045
    """
1046
    clustername = self.op.name
1047
    ip = self.ip
1048

    
1049
    # shutdown the master IP
1050
    master = self.cfg.GetMasterNode()
1051
    if not rpc.call_node_stop_master(master, False):
1052
      raise errors.OpExecError("Could not disable the master role")
1053

    
1054
    try:
1055
      # modify the sstore
1056
      # TODO: sstore
1057
      ss.SetKey(ss.SS_MASTER_IP, ip)
1058
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1059

    
1060
      # Distribute updated ss config to all nodes
1061
      myself = self.cfg.GetNodeInfo(master)
1062
      dist_nodes = self.cfg.GetNodeList()
1063
      if myself.name in dist_nodes:
1064
        dist_nodes.remove(myself.name)
1065

    
1066
      logger.Debug("Copying updated ssconf data to all nodes")
1067
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1068
        fname = ss.KeyToFilename(keyname)
1069
        result = rpc.call_upload_file(dist_nodes, fname)
1070
        for to_node in dist_nodes:
1071
          if not result[to_node]:
1072
            logger.Error("copy of file %s to node %s failed" %
1073
                         (fname, to_node))
1074
    finally:
1075
      if not rpc.call_node_start_master(master, False):
1076
        logger.Error("Could not re-enable the master role on the master,"
1077
                     " please restart manually.")
1078

    
1079

    
1080
def _RecursiveCheckIfLVMBased(disk):
1081
  """Check if the given disk or its children are lvm-based.
1082

1083
  Args:
1084
    disk: ganeti.objects.Disk object
1085

1086
  Returns:
1087
    boolean indicating whether a LD_LV dev_type was found or not
1088

1089
  """
1090
  if disk.children:
1091
    for chdisk in disk.children:
1092
      if _RecursiveCheckIfLVMBased(chdisk):
1093
        return True
1094
  return disk.dev_type == constants.LD_LV
1095

    
1096

    
1097
class LUSetClusterParams(LogicalUnit):
1098
  """Change the parameters of the cluster.
1099

1100
  """
1101
  HPATH = "cluster-modify"
1102
  HTYPE = constants.HTYPE_CLUSTER
1103
  _OP_REQP = []
1104
  REQ_BGL = False
1105

    
1106
  def ExpandNames(self):
1107
    # FIXME: in the future maybe other cluster params won't require checking on
1108
    # all nodes to be modified.
1109
    self.needed_locks = {
1110
      locking.LEVEL_NODE: locking.ALL_SET,
1111
    }
1112
    self.share_locks[locking.LEVEL_NODE] = 1
1113

    
1114
  def BuildHooksEnv(self):
1115
    """Build hooks env.
1116

1117
    """
1118
    env = {
1119
      "OP_TARGET": self.cfg.GetClusterName(),
1120
      "NEW_VG_NAME": self.op.vg_name,
1121
      }
1122
    mn = self.cfg.GetMasterNode()
1123
    return env, [mn], [mn]
1124

    
1125
  def CheckPrereq(self):
1126
    """Check prerequisites.
1127

1128
    This checks whether the given params don't conflict and
1129
    if the given volume group is valid.
1130

1131
    """
1132
    # FIXME: This only works because there is only one parameter that can be
1133
    # changed or removed.
1134
    if not self.op.vg_name:
1135
      instances = self.cfg.GetAllInstancesInfo().values()
1136
      for inst in instances:
1137
        for disk in inst.disks:
1138
          if _RecursiveCheckIfLVMBased(disk):
1139
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1140
                                       " lvm-based instances exist")
1141

    
1142
    # if vg_name not None, checks given volume group on all nodes
1143
    if self.op.vg_name:
1144
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1145
      vglist = rpc.call_vg_list(node_list)
1146
      for node in node_list:
1147
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1148
                                              constants.MIN_VG_SIZE)
1149
        if vgstatus:
1150
          raise errors.OpPrereqError("Error on node '%s': %s" %
1151
                                     (node, vgstatus))
1152

    
1153
  def Exec(self, feedback_fn):
1154
    """Change the parameters of the cluster.
1155

1156
    """
1157
    if self.op.vg_name != self.cfg.GetVGName():
1158
      self.cfg.SetVGName(self.op.vg_name)
1159
    else:
1160
      feedback_fn("Cluster LVM configuration already in desired"
1161
                  " state, not changing")
1162

    
1163

    
1164
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1165
  """Sleep and poll for an instance's disk to sync.
1166

1167
  """
1168
  if not instance.disks:
1169
    return True
1170

    
1171
  if not oneshot:
1172
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1173

    
1174
  node = instance.primary_node
1175

    
1176
  for dev in instance.disks:
1177
    cfgw.SetDiskID(dev, node)
1178

    
1179
  retries = 0
1180
  while True:
1181
    max_time = 0
1182
    done = True
1183
    cumul_degraded = False
1184
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1185
    if not rstats:
1186
      proc.LogWarning("Can't get any data from node %s" % node)
1187
      retries += 1
1188
      if retries >= 10:
1189
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1190
                                 " aborting." % node)
1191
      time.sleep(6)
1192
      continue
1193
    retries = 0
1194
    for i in range(len(rstats)):
1195
      mstat = rstats[i]
1196
      if mstat is None:
1197
        proc.LogWarning("Can't compute data for node %s/%s" %
1198
                        (node, instance.disks[i].iv_name))
1199
        continue
1200
      # we ignore the ldisk parameter
1201
      perc_done, est_time, is_degraded, _ = mstat
1202
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1203
      if perc_done is not None:
1204
        done = False
1205
        if est_time is not None:
1206
          rem_time = "%d estimated seconds remaining" % est_time
1207
          max_time = est_time
1208
        else:
1209
          rem_time = "no time estimate"
1210
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1211
                     (instance.disks[i].iv_name, perc_done, rem_time))
1212
    if done or oneshot:
1213
      break
1214

    
1215
    time.sleep(min(60, max_time))
1216

    
1217
  if done:
1218
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1219
  return not cumul_degraded
1220

    
1221

    
1222
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1223
  """Check that mirrors are not degraded.
1224

1225
  The ldisk parameter, if True, will change the test from the
1226
  is_degraded attribute (which represents overall non-ok status for
1227
  the device(s)) to the ldisk (representing the local storage status).
1228

1229
  """
1230
  cfgw.SetDiskID(dev, node)
1231
  if ldisk:
1232
    idx = 6
1233
  else:
1234
    idx = 5
1235

    
1236
  result = True
1237
  if on_primary or dev.AssembleOnSecondary():
1238
    rstats = rpc.call_blockdev_find(node, dev)
1239
    if not rstats:
1240
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1241
      result = False
1242
    else:
1243
      result = result and (not rstats[idx])
1244
  if dev.children:
1245
    for child in dev.children:
1246
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1247

    
1248
  return result
1249

    
1250

    
1251
class LUDiagnoseOS(NoHooksLU):
1252
  """Logical unit for OS diagnose/query.
1253

1254
  """
1255
  _OP_REQP = ["output_fields", "names"]
1256
  REQ_BGL = False
1257

    
1258
  def ExpandNames(self):
1259
    if self.op.names:
1260
      raise errors.OpPrereqError("Selective OS query not supported")
1261

    
1262
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1263
    _CheckOutputFields(static=[],
1264
                       dynamic=self.dynamic_fields,
1265
                       selected=self.op.output_fields)
1266

    
1267
    # Lock all nodes, in shared mode
1268
    self.needed_locks = {}
1269
    self.share_locks[locking.LEVEL_NODE] = 1
1270
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1271

    
1272
  def CheckPrereq(self):
1273
    """Check prerequisites.
1274

1275
    """
1276

    
1277
  @staticmethod
1278
  def _DiagnoseByOS(node_list, rlist):
1279
    """Remaps a per-node return list into an a per-os per-node dictionary
1280

1281
      Args:
1282
        node_list: a list with the names of all nodes
1283
        rlist: a map with node names as keys and OS objects as values
1284

1285
      Returns:
1286
        map: a map with osnames as keys and as value another map, with
1287
             nodes as
1288
             keys and list of OS objects as values
1289
             e.g. {"debian-etch": {"node1": [<object>,...],
1290
                                   "node2": [<object>,]}
1291
                  }
1292

1293
    """
1294
    all_os = {}
1295
    for node_name, nr in rlist.iteritems():
1296
      if not nr:
1297
        continue
1298
      for os_obj in nr:
1299
        if os_obj.name not in all_os:
1300
          # build a list of nodes for this os containing empty lists
1301
          # for each node in node_list
1302
          all_os[os_obj.name] = {}
1303
          for nname in node_list:
1304
            all_os[os_obj.name][nname] = []
1305
        all_os[os_obj.name][node_name].append(os_obj)
1306
    return all_os
1307

    
1308
  def Exec(self, feedback_fn):
1309
    """Compute the list of OSes.
1310

1311
    """
1312
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1313
    node_data = rpc.call_os_diagnose(node_list)
1314
    if node_data == False:
1315
      raise errors.OpExecError("Can't gather the list of OSes")
1316
    pol = self._DiagnoseByOS(node_list, node_data)
1317
    output = []
1318
    for os_name, os_data in pol.iteritems():
1319
      row = []
1320
      for field in self.op.output_fields:
1321
        if field == "name":
1322
          val = os_name
1323
        elif field == "valid":
1324
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1325
        elif field == "node_status":
1326
          val = {}
1327
          for node_name, nos_list in os_data.iteritems():
1328
            val[node_name] = [(v.status, v.path) for v in nos_list]
1329
        else:
1330
          raise errors.ParameterError(field)
1331
        row.append(val)
1332
      output.append(row)
1333

    
1334
    return output
1335

    
1336

    
1337
class LURemoveNode(LogicalUnit):
1338
  """Logical unit for removing a node.
1339

1340
  """
1341
  HPATH = "node-remove"
1342
  HTYPE = constants.HTYPE_NODE
1343
  _OP_REQP = ["node_name"]
1344

    
1345
  def BuildHooksEnv(self):
1346
    """Build hooks env.
1347

1348
    This doesn't run on the target node in the pre phase as a failed
1349
    node would then be impossible to remove.
1350

1351
    """
1352
    env = {
1353
      "OP_TARGET": self.op.node_name,
1354
      "NODE_NAME": self.op.node_name,
1355
      }
1356
    all_nodes = self.cfg.GetNodeList()
1357
    all_nodes.remove(self.op.node_name)
1358
    return env, all_nodes, all_nodes
1359

    
1360
  def CheckPrereq(self):
1361
    """Check prerequisites.
1362

1363
    This checks:
1364
     - the node exists in the configuration
1365
     - it does not have primary or secondary instances
1366
     - it's not the master
1367

1368
    Any errors are signalled by raising errors.OpPrereqError.
1369

1370
    """
1371
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1372
    if node is None:
1373
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1374

    
1375
    instance_list = self.cfg.GetInstanceList()
1376

    
1377
    masternode = self.cfg.GetMasterNode()
1378
    if node.name == masternode:
1379
      raise errors.OpPrereqError("Node is the master node,"
1380
                                 " you need to failover first.")
1381

    
1382
    for instance_name in instance_list:
1383
      instance = self.cfg.GetInstanceInfo(instance_name)
1384
      if node.name == instance.primary_node:
1385
        raise errors.OpPrereqError("Instance %s still running on the node,"
1386
                                   " please remove first." % instance_name)
1387
      if node.name in instance.secondary_nodes:
1388
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1389
                                   " please remove first." % instance_name)
1390
    self.op.node_name = node.name
1391
    self.node = node
1392

    
1393
  def Exec(self, feedback_fn):
1394
    """Removes the node from the cluster.
1395

1396
    """
1397
    node = self.node
1398
    logger.Info("stopping the node daemon and removing configs from node %s" %
1399
                node.name)
1400

    
1401
    self.context.RemoveNode(node.name)
1402

    
1403
    rpc.call_node_leave_cluster(node.name)
1404

    
1405

    
1406
class LUQueryNodes(NoHooksLU):
1407
  """Logical unit for querying nodes.
1408

1409
  """
1410
  _OP_REQP = ["output_fields", "names"]
1411
  REQ_BGL = False
1412

    
1413
  def ExpandNames(self):
1414
    self.dynamic_fields = frozenset([
1415
      "dtotal", "dfree",
1416
      "mtotal", "mnode", "mfree",
1417
      "bootid",
1418
      "ctotal",
1419
      ])
1420

    
1421
    self.static_fields = frozenset([
1422
      "name", "pinst_cnt", "sinst_cnt",
1423
      "pinst_list", "sinst_list",
1424
      "pip", "sip", "tags",
1425
      "serial_no",
1426
      ])
1427

    
1428
    _CheckOutputFields(static=self.static_fields,
1429
                       dynamic=self.dynamic_fields,
1430
                       selected=self.op.output_fields)
1431

    
1432
    self.needed_locks = {}
1433
    self.share_locks[locking.LEVEL_NODE] = 1
1434

    
1435
    if self.op.names:
1436
      self.wanted = _GetWantedNodes(self, self.op.names)
1437
    else:
1438
      self.wanted = locking.ALL_SET
1439

    
1440
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1441
    if self.do_locking:
1442
      # if we don't request only static fields, we need to lock the nodes
1443
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1444

    
1445

    
1446
  def CheckPrereq(self):
1447
    """Check prerequisites.
1448

1449
    """
1450
    # The validation of the node list is done in the _GetWantedNodes,
1451
    # if non empty, and if empty, there's no validation to do
1452
    pass
1453

    
1454
  def Exec(self, feedback_fn):
1455
    """Computes the list of nodes and their attributes.
1456

1457
    """
1458
    all_info = self.cfg.GetAllNodesInfo()
1459
    if self.do_locking:
1460
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1461
    elif self.wanted != locking.ALL_SET:
1462
      nodenames = self.wanted
1463
      missing = set(nodenames).difference(all_info.keys())
1464
      if missing:
1465
        raise self.OpExecError(
1466
          "Some nodes were removed before retrieving their data: %s" % missing)
1467
    else:
1468
      nodenames = all_info.keys()
1469
    nodelist = [all_info[name] for name in nodenames]
1470

    
1471
    # begin data gathering
1472

    
1473
    if self.dynamic_fields.intersection(self.op.output_fields):
1474
      live_data = {}
1475
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1476
      for name in nodenames:
1477
        nodeinfo = node_data.get(name, None)
1478
        if nodeinfo:
1479
          live_data[name] = {
1480
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1481
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1482
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1483
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1484
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1485
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1486
            "bootid": nodeinfo['bootid'],
1487
            }
1488
        else:
1489
          live_data[name] = {}
1490
    else:
1491
      live_data = dict.fromkeys(nodenames, {})
1492

    
1493
    node_to_primary = dict([(name, set()) for name in nodenames])
1494
    node_to_secondary = dict([(name, set()) for name in nodenames])
1495

    
1496
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1497
                             "sinst_cnt", "sinst_list"))
1498
    if inst_fields & frozenset(self.op.output_fields):
1499
      instancelist = self.cfg.GetInstanceList()
1500

    
1501
      for instance_name in instancelist:
1502
        inst = self.cfg.GetInstanceInfo(instance_name)
1503
        if inst.primary_node in node_to_primary:
1504
          node_to_primary[inst.primary_node].add(inst.name)
1505
        for secnode in inst.secondary_nodes:
1506
          if secnode in node_to_secondary:
1507
            node_to_secondary[secnode].add(inst.name)
1508

    
1509
    # end data gathering
1510

    
1511
    output = []
1512
    for node in nodelist:
1513
      node_output = []
1514
      for field in self.op.output_fields:
1515
        if field == "name":
1516
          val = node.name
1517
        elif field == "pinst_list":
1518
          val = list(node_to_primary[node.name])
1519
        elif field == "sinst_list":
1520
          val = list(node_to_secondary[node.name])
1521
        elif field == "pinst_cnt":
1522
          val = len(node_to_primary[node.name])
1523
        elif field == "sinst_cnt":
1524
          val = len(node_to_secondary[node.name])
1525
        elif field == "pip":
1526
          val = node.primary_ip
1527
        elif field == "sip":
1528
          val = node.secondary_ip
1529
        elif field == "tags":
1530
          val = list(node.GetTags())
1531
        elif field == "serial_no":
1532
          val = node.serial_no
1533
        elif field in self.dynamic_fields:
1534
          val = live_data[node.name].get(field, None)
1535
        else:
1536
          raise errors.ParameterError(field)
1537
        node_output.append(val)
1538
      output.append(node_output)
1539

    
1540
    return output
1541

    
1542

    
1543
class LUQueryNodeVolumes(NoHooksLU):
1544
  """Logical unit for getting volumes on node(s).
1545

1546
  """
1547
  _OP_REQP = ["nodes", "output_fields"]
1548
  REQ_BGL = False
1549

    
1550
  def ExpandNames(self):
1551
    _CheckOutputFields(static=["node"],
1552
                       dynamic=["phys", "vg", "name", "size", "instance"],
1553
                       selected=self.op.output_fields)
1554

    
1555
    self.needed_locks = {}
1556
    self.share_locks[locking.LEVEL_NODE] = 1
1557
    if not self.op.nodes:
1558
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1559
    else:
1560
      self.needed_locks[locking.LEVEL_NODE] = \
1561
        _GetWantedNodes(self, self.op.nodes)
1562

    
1563
  def CheckPrereq(self):
1564
    """Check prerequisites.
1565

1566
    This checks that the fields required are valid output fields.
1567

1568
    """
1569
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1570

    
1571
  def Exec(self, feedback_fn):
1572
    """Computes the list of nodes and their attributes.
1573

1574
    """
1575
    nodenames = self.nodes
1576
    volumes = rpc.call_node_volumes(nodenames)
1577

    
1578
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1579
             in self.cfg.GetInstanceList()]
1580

    
1581
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1582

    
1583
    output = []
1584
    for node in nodenames:
1585
      if node not in volumes or not volumes[node]:
1586
        continue
1587

    
1588
      node_vols = volumes[node][:]
1589
      node_vols.sort(key=lambda vol: vol['dev'])
1590

    
1591
      for vol in node_vols:
1592
        node_output = []
1593
        for field in self.op.output_fields:
1594
          if field == "node":
1595
            val = node
1596
          elif field == "phys":
1597
            val = vol['dev']
1598
          elif field == "vg":
1599
            val = vol['vg']
1600
          elif field == "name":
1601
            val = vol['name']
1602
          elif field == "size":
1603
            val = int(float(vol['size']))
1604
          elif field == "instance":
1605
            for inst in ilist:
1606
              if node not in lv_by_node[inst]:
1607
                continue
1608
              if vol['name'] in lv_by_node[inst][node]:
1609
                val = inst.name
1610
                break
1611
            else:
1612
              val = '-'
1613
          else:
1614
            raise errors.ParameterError(field)
1615
          node_output.append(str(val))
1616

    
1617
        output.append(node_output)
1618

    
1619
    return output
1620

    
1621

    
1622
class LUAddNode(LogicalUnit):
1623
  """Logical unit for adding node to the cluster.
1624

1625
  """
1626
  HPATH = "node-add"
1627
  HTYPE = constants.HTYPE_NODE
1628
  _OP_REQP = ["node_name"]
1629

    
1630
  def BuildHooksEnv(self):
1631
    """Build hooks env.
1632

1633
    This will run on all nodes before, and on all nodes + the new node after.
1634

1635
    """
1636
    env = {
1637
      "OP_TARGET": self.op.node_name,
1638
      "NODE_NAME": self.op.node_name,
1639
      "NODE_PIP": self.op.primary_ip,
1640
      "NODE_SIP": self.op.secondary_ip,
1641
      }
1642
    nodes_0 = self.cfg.GetNodeList()
1643
    nodes_1 = nodes_0 + [self.op.node_name, ]
1644
    return env, nodes_0, nodes_1
1645

    
1646
  def CheckPrereq(self):
1647
    """Check prerequisites.
1648

1649
    This checks:
1650
     - the new node is not already in the config
1651
     - it is resolvable
1652
     - its parameters (single/dual homed) matches the cluster
1653

1654
    Any errors are signalled by raising errors.OpPrereqError.
1655

1656
    """
1657
    node_name = self.op.node_name
1658
    cfg = self.cfg
1659

    
1660
    dns_data = utils.HostInfo(node_name)
1661

    
1662
    node = dns_data.name
1663
    primary_ip = self.op.primary_ip = dns_data.ip
1664
    secondary_ip = getattr(self.op, "secondary_ip", None)
1665
    if secondary_ip is None:
1666
      secondary_ip = primary_ip
1667
    if not utils.IsValidIP(secondary_ip):
1668
      raise errors.OpPrereqError("Invalid secondary IP given")
1669
    self.op.secondary_ip = secondary_ip
1670

    
1671
    node_list = cfg.GetNodeList()
1672
    if not self.op.readd and node in node_list:
1673
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1674
                                 node)
1675
    elif self.op.readd and node not in node_list:
1676
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1677

    
1678
    for existing_node_name in node_list:
1679
      existing_node = cfg.GetNodeInfo(existing_node_name)
1680

    
1681
      if self.op.readd and node == existing_node_name:
1682
        if (existing_node.primary_ip != primary_ip or
1683
            existing_node.secondary_ip != secondary_ip):
1684
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1685
                                     " address configuration as before")
1686
        continue
1687

    
1688
      if (existing_node.primary_ip == primary_ip or
1689
          existing_node.secondary_ip == primary_ip or
1690
          existing_node.primary_ip == secondary_ip or
1691
          existing_node.secondary_ip == secondary_ip):
1692
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1693
                                   " existing node %s" % existing_node.name)
1694

    
1695
    # check that the type of the node (single versus dual homed) is the
1696
    # same as for the master
1697
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1698
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1699
    newbie_singlehomed = secondary_ip == primary_ip
1700
    if master_singlehomed != newbie_singlehomed:
1701
      if master_singlehomed:
1702
        raise errors.OpPrereqError("The master has no private ip but the"
1703
                                   " new node has one")
1704
      else:
1705
        raise errors.OpPrereqError("The master has a private ip but the"
1706
                                   " new node doesn't have one")
1707

    
1708
    # checks reachablity
1709
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1710
      raise errors.OpPrereqError("Node not reachable by ping")
1711

    
1712
    if not newbie_singlehomed:
1713
      # check reachability from my secondary ip to newbie's secondary ip
1714
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1715
                           source=myself.secondary_ip):
1716
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1717
                                   " based ping to noded port")
1718

    
1719
    self.new_node = objects.Node(name=node,
1720
                                 primary_ip=primary_ip,
1721
                                 secondary_ip=secondary_ip)
1722

    
1723
  def Exec(self, feedback_fn):
1724
    """Adds the new node to the cluster.
1725

1726
    """
1727
    new_node = self.new_node
1728
    node = new_node.name
1729

    
1730
    # check connectivity
1731
    result = rpc.call_version([node])[node]
1732
    if result:
1733
      if constants.PROTOCOL_VERSION == result:
1734
        logger.Info("communication to node %s fine, sw version %s match" %
1735
                    (node, result))
1736
      else:
1737
        raise errors.OpExecError("Version mismatch master version %s,"
1738
                                 " node version %s" %
1739
                                 (constants.PROTOCOL_VERSION, result))
1740
    else:
1741
      raise errors.OpExecError("Cannot get version from the new node")
1742

    
1743
    # setup ssh on node
1744
    logger.Info("copy ssh key to node %s" % node)
1745
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1746
    keyarray = []
1747
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1748
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1749
                priv_key, pub_key]
1750

    
1751
    for i in keyfiles:
1752
      f = open(i, 'r')
1753
      try:
1754
        keyarray.append(f.read())
1755
      finally:
1756
        f.close()
1757

    
1758
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1759
                               keyarray[3], keyarray[4], keyarray[5])
1760

    
1761
    if not result:
1762
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1763

    
1764
    # Add node to our /etc/hosts, and add key to known_hosts
1765
    utils.AddHostToEtcHosts(new_node.name)
1766

    
1767
    if new_node.secondary_ip != new_node.primary_ip:
1768
      if not rpc.call_node_tcp_ping(new_node.name,
1769
                                    constants.LOCALHOST_IP_ADDRESS,
1770
                                    new_node.secondary_ip,
1771
                                    constants.DEFAULT_NODED_PORT,
1772
                                    10, False):
1773
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1774
                                 " you gave (%s). Please fix and re-run this"
1775
                                 " command." % new_node.secondary_ip)
1776

    
1777
    node_verify_list = [self.cfg.GetMasterNode()]
1778
    node_verify_param = {
1779
      'nodelist': [node],
1780
      # TODO: do a node-net-test as well?
1781
    }
1782

    
1783
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1784
    for verifier in node_verify_list:
1785
      if not result[verifier]:
1786
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1787
                                 " for remote verification" % verifier)
1788
      if result[verifier]['nodelist']:
1789
        for failed in result[verifier]['nodelist']:
1790
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1791
                      (verifier, result[verifier]['nodelist'][failed]))
1792
        raise errors.OpExecError("ssh/hostname verification failed.")
1793

    
1794
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1795
    # including the node just added
1796
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1797
    dist_nodes = self.cfg.GetNodeList()
1798
    if not self.op.readd:
1799
      dist_nodes.append(node)
1800
    if myself.name in dist_nodes:
1801
      dist_nodes.remove(myself.name)
1802

    
1803
    logger.Debug("Copying hosts and known_hosts to all nodes")
1804
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1805
      result = rpc.call_upload_file(dist_nodes, fname)
1806
      for to_node in dist_nodes:
1807
        if not result[to_node]:
1808
          logger.Error("copy of file %s to node %s failed" %
1809
                       (fname, to_node))
1810

    
1811
    to_copy = []
1812
    if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
1813
      to_copy.append(constants.VNC_PASSWORD_FILE)
1814
    for fname in to_copy:
1815
      result = rpc.call_upload_file([node], fname)
1816
      if not result[node]:
1817
        logger.Error("could not copy file %s to node %s" % (fname, node))
1818

    
1819
    if self.op.readd:
1820
      self.context.ReaddNode(new_node)
1821
    else:
1822
      self.context.AddNode(new_node)
1823

    
1824

    
1825
class LUQueryClusterInfo(NoHooksLU):
1826
  """Query cluster configuration.
1827

1828
  """
1829
  _OP_REQP = []
1830
  REQ_MASTER = False
1831
  REQ_BGL = False
1832

    
1833
  def ExpandNames(self):
1834
    self.needed_locks = {}
1835

    
1836
  def CheckPrereq(self):
1837
    """No prerequsites needed for this LU.
1838

1839
    """
1840
    pass
1841

    
1842
  def Exec(self, feedback_fn):
1843
    """Return cluster config.
1844

1845
    """
1846
    result = {
1847
      "name": self.cfg.GetClusterName(),
1848
      "software_version": constants.RELEASE_VERSION,
1849
      "protocol_version": constants.PROTOCOL_VERSION,
1850
      "config_version": constants.CONFIG_VERSION,
1851
      "os_api_version": constants.OS_API_VERSION,
1852
      "export_version": constants.EXPORT_VERSION,
1853
      "master": self.cfg.GetMasterNode(),
1854
      "architecture": (platform.architecture()[0], platform.machine()),
1855
      "hypervisor_type": self.cfg.GetHypervisorType(),
1856
      }
1857

    
1858
    return result
1859

    
1860

    
1861
class LUQueryConfigValues(NoHooksLU):
1862
  """Return configuration values.
1863

1864
  """
1865
  _OP_REQP = []
1866
  REQ_BGL = False
1867

    
1868
  def ExpandNames(self):
1869
    self.needed_locks = {}
1870

    
1871
    static_fields = ["cluster_name", "master_node"]
1872
    _CheckOutputFields(static=static_fields,
1873
                       dynamic=[],
1874
                       selected=self.op.output_fields)
1875

    
1876
  def CheckPrereq(self):
1877
    """No prerequisites.
1878

1879
    """
1880
    pass
1881

    
1882
  def Exec(self, feedback_fn):
1883
    """Dump a representation of the cluster config to the standard output.
1884

1885
    """
1886
    values = []
1887
    for field in self.op.output_fields:
1888
      if field == "cluster_name":
1889
        values.append(self.cfg.GetClusterName())
1890
      elif field == "master_node":
1891
        values.append(self.cfg.GetMasterNode())
1892
      else:
1893
        raise errors.ParameterError(field)
1894
    return values
1895

    
1896

    
1897
class LUActivateInstanceDisks(NoHooksLU):
1898
  """Bring up an instance's disks.
1899

1900
  """
1901
  _OP_REQP = ["instance_name"]
1902
  REQ_BGL = False
1903

    
1904
  def ExpandNames(self):
1905
    self._ExpandAndLockInstance()
1906
    self.needed_locks[locking.LEVEL_NODE] = []
1907
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1908

    
1909
  def DeclareLocks(self, level):
1910
    if level == locking.LEVEL_NODE:
1911
      self._LockInstancesNodes()
1912

    
1913
  def CheckPrereq(self):
1914
    """Check prerequisites.
1915

1916
    This checks that the instance is in the cluster.
1917

1918
    """
1919
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1920
    assert self.instance is not None, \
1921
      "Cannot retrieve locked instance %s" % self.op.instance_name
1922

    
1923
  def Exec(self, feedback_fn):
1924
    """Activate the disks.
1925

1926
    """
1927
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1928
    if not disks_ok:
1929
      raise errors.OpExecError("Cannot activate block devices")
1930

    
1931
    return disks_info
1932

    
1933

    
1934
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1935
  """Prepare the block devices for an instance.
1936

1937
  This sets up the block devices on all nodes.
1938

1939
  Args:
1940
    instance: a ganeti.objects.Instance object
1941
    ignore_secondaries: if true, errors on secondary nodes won't result
1942
                        in an error return from the function
1943

1944
  Returns:
1945
    false if the operation failed
1946
    list of (host, instance_visible_name, node_visible_name) if the operation
1947
         suceeded with the mapping from node devices to instance devices
1948
  """
1949
  device_info = []
1950
  disks_ok = True
1951
  iname = instance.name
1952
  # With the two passes mechanism we try to reduce the window of
1953
  # opportunity for the race condition of switching DRBD to primary
1954
  # before handshaking occured, but we do not eliminate it
1955

    
1956
  # The proper fix would be to wait (with some limits) until the
1957
  # connection has been made and drbd transitions from WFConnection
1958
  # into any other network-connected state (Connected, SyncTarget,
1959
  # SyncSource, etc.)
1960

    
1961
  # 1st pass, assemble on all nodes in secondary mode
1962
  for inst_disk in instance.disks:
1963
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1964
      cfg.SetDiskID(node_disk, node)
1965
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1966
      if not result:
1967
        logger.Error("could not prepare block device %s on node %s"
1968
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1969
        if not ignore_secondaries:
1970
          disks_ok = False
1971

    
1972
  # FIXME: race condition on drbd migration to primary
1973

    
1974
  # 2nd pass, do only the primary node
1975
  for inst_disk in instance.disks:
1976
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1977
      if node != instance.primary_node:
1978
        continue
1979
      cfg.SetDiskID(node_disk, node)
1980
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1981
      if not result:
1982
        logger.Error("could not prepare block device %s on node %s"
1983
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1984
        disks_ok = False
1985
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1986

    
1987
  # leave the disks configured for the primary node
1988
  # this is a workaround that would be fixed better by
1989
  # improving the logical/physical id handling
1990
  for disk in instance.disks:
1991
    cfg.SetDiskID(disk, instance.primary_node)
1992

    
1993
  return disks_ok, device_info
1994

    
1995

    
1996
def _StartInstanceDisks(cfg, instance, force):
1997
  """Start the disks of an instance.
1998

1999
  """
2000
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
2001
                                           ignore_secondaries=force)
2002
  if not disks_ok:
2003
    _ShutdownInstanceDisks(instance, cfg)
2004
    if force is not None and not force:
2005
      logger.Error("If the message above refers to a secondary node,"
2006
                   " you can retry the operation using '--force'.")
2007
    raise errors.OpExecError("Disk consistency error")
2008

    
2009

    
2010
class LUDeactivateInstanceDisks(NoHooksLU):
2011
  """Shutdown an instance's disks.
2012

2013
  """
2014
  _OP_REQP = ["instance_name"]
2015
  REQ_BGL = False
2016

    
2017
  def ExpandNames(self):
2018
    self._ExpandAndLockInstance()
2019
    self.needed_locks[locking.LEVEL_NODE] = []
2020
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2021

    
2022
  def DeclareLocks(self, level):
2023
    if level == locking.LEVEL_NODE:
2024
      self._LockInstancesNodes()
2025

    
2026
  def CheckPrereq(self):
2027
    """Check prerequisites.
2028

2029
    This checks that the instance is in the cluster.
2030

2031
    """
2032
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2033
    assert self.instance is not None, \
2034
      "Cannot retrieve locked instance %s" % self.op.instance_name
2035

    
2036
  def Exec(self, feedback_fn):
2037
    """Deactivate the disks
2038

2039
    """
2040
    instance = self.instance
2041
    _SafeShutdownInstanceDisks(instance, self.cfg)
2042

    
2043

    
2044
def _SafeShutdownInstanceDisks(instance, cfg):
2045
  """Shutdown block devices of an instance.
2046

2047
  This function checks if an instance is running, before calling
2048
  _ShutdownInstanceDisks.
2049

2050
  """
2051
  ins_l = rpc.call_instance_list([instance.primary_node])
2052
  ins_l = ins_l[instance.primary_node]
2053
  if not type(ins_l) is list:
2054
    raise errors.OpExecError("Can't contact node '%s'" %
2055
                             instance.primary_node)
2056

    
2057
  if instance.name in ins_l:
2058
    raise errors.OpExecError("Instance is running, can't shutdown"
2059
                             " block devices.")
2060

    
2061
  _ShutdownInstanceDisks(instance, cfg)
2062

    
2063

    
2064
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2065
  """Shutdown block devices of an instance.
2066

2067
  This does the shutdown on all nodes of the instance.
2068

2069
  If the ignore_primary is false, errors on the primary node are
2070
  ignored.
2071

2072
  """
2073
  result = True
2074
  for disk in instance.disks:
2075
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2076
      cfg.SetDiskID(top_disk, node)
2077
      if not rpc.call_blockdev_shutdown(node, top_disk):
2078
        logger.Error("could not shutdown block device %s on node %s" %
2079
                     (disk.iv_name, node))
2080
        if not ignore_primary or node != instance.primary_node:
2081
          result = False
2082
  return result
2083

    
2084

    
2085
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2086
  """Checks if a node has enough free memory.
2087

2088
  This function check if a given node has the needed amount of free
2089
  memory. In case the node has less memory or we cannot get the
2090
  information from the node, this function raise an OpPrereqError
2091
  exception.
2092

2093
  Args:
2094
    - cfg: a ConfigWriter instance
2095
    - node: the node name
2096
    - reason: string to use in the error message
2097
    - requested: the amount of memory in MiB
2098

2099
  """
2100
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2101
  if not nodeinfo or not isinstance(nodeinfo, dict):
2102
    raise errors.OpPrereqError("Could not contact node %s for resource"
2103
                             " information" % (node,))
2104

    
2105
  free_mem = nodeinfo[node].get('memory_free')
2106
  if not isinstance(free_mem, int):
2107
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2108
                             " was '%s'" % (node, free_mem))
2109
  if requested > free_mem:
2110
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2111
                             " needed %s MiB, available %s MiB" %
2112
                             (node, reason, requested, free_mem))
2113

    
2114

    
2115
class LUStartupInstance(LogicalUnit):
2116
  """Starts an instance.
2117

2118
  """
2119
  HPATH = "instance-start"
2120
  HTYPE = constants.HTYPE_INSTANCE
2121
  _OP_REQP = ["instance_name", "force"]
2122
  REQ_BGL = False
2123

    
2124
  def ExpandNames(self):
2125
    self._ExpandAndLockInstance()
2126
    self.needed_locks[locking.LEVEL_NODE] = []
2127
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2128

    
2129
  def DeclareLocks(self, level):
2130
    if level == locking.LEVEL_NODE:
2131
      self._LockInstancesNodes()
2132

    
2133
  def BuildHooksEnv(self):
2134
    """Build hooks env.
2135

2136
    This runs on master, primary and secondary nodes of the instance.
2137

2138
    """
2139
    env = {
2140
      "FORCE": self.op.force,
2141
      }
2142
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2143
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2144
          list(self.instance.secondary_nodes))
2145
    return env, nl, nl
2146

    
2147
  def CheckPrereq(self):
2148
    """Check prerequisites.
2149

2150
    This checks that the instance is in the cluster.
2151

2152
    """
2153
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2154
    assert self.instance is not None, \
2155
      "Cannot retrieve locked instance %s" % self.op.instance_name
2156

    
2157
    # check bridges existance
2158
    _CheckInstanceBridgesExist(instance)
2159

    
2160
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2161
                         "starting instance %s" % instance.name,
2162
                         instance.memory)
2163

    
2164
  def Exec(self, feedback_fn):
2165
    """Start the instance.
2166

2167
    """
2168
    instance = self.instance
2169
    force = self.op.force
2170
    extra_args = getattr(self.op, "extra_args", "")
2171

    
2172
    self.cfg.MarkInstanceUp(instance.name)
2173

    
2174
    node_current = instance.primary_node
2175

    
2176
    _StartInstanceDisks(self.cfg, instance, force)
2177

    
2178
    if not rpc.call_instance_start(node_current, instance, extra_args):
2179
      _ShutdownInstanceDisks(instance, self.cfg)
2180
      raise errors.OpExecError("Could not start instance")
2181

    
2182

    
2183
class LURebootInstance(LogicalUnit):
2184
  """Reboot an instance.
2185

2186
  """
2187
  HPATH = "instance-reboot"
2188
  HTYPE = constants.HTYPE_INSTANCE
2189
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2190
  REQ_BGL = False
2191

    
2192
  def ExpandNames(self):
2193
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2194
                                   constants.INSTANCE_REBOOT_HARD,
2195
                                   constants.INSTANCE_REBOOT_FULL]:
2196
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2197
                                  (constants.INSTANCE_REBOOT_SOFT,
2198
                                   constants.INSTANCE_REBOOT_HARD,
2199
                                   constants.INSTANCE_REBOOT_FULL))
2200
    self._ExpandAndLockInstance()
2201
    self.needed_locks[locking.LEVEL_NODE] = []
2202
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2203

    
2204
  def DeclareLocks(self, level):
2205
    if level == locking.LEVEL_NODE:
2206
      primary_only = not constants.INSTANCE_REBOOT_FULL
2207
      self._LockInstancesNodes(primary_only=primary_only)
2208

    
2209
  def BuildHooksEnv(self):
2210
    """Build hooks env.
2211

2212
    This runs on master, primary and secondary nodes of the instance.
2213

2214
    """
2215
    env = {
2216
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2217
      }
2218
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2219
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2220
          list(self.instance.secondary_nodes))
2221
    return env, nl, nl
2222

    
2223
  def CheckPrereq(self):
2224
    """Check prerequisites.
2225

2226
    This checks that the instance is in the cluster.
2227

2228
    """
2229
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2230
    assert self.instance is not None, \
2231
      "Cannot retrieve locked instance %s" % self.op.instance_name
2232

    
2233
    # check bridges existance
2234
    _CheckInstanceBridgesExist(instance)
2235

    
2236
  def Exec(self, feedback_fn):
2237
    """Reboot the instance.
2238

2239
    """
2240
    instance = self.instance
2241
    ignore_secondaries = self.op.ignore_secondaries
2242
    reboot_type = self.op.reboot_type
2243
    extra_args = getattr(self.op, "extra_args", "")
2244

    
2245
    node_current = instance.primary_node
2246

    
2247
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2248
                       constants.INSTANCE_REBOOT_HARD]:
2249
      if not rpc.call_instance_reboot(node_current, instance,
2250
                                      reboot_type, extra_args):
2251
        raise errors.OpExecError("Could not reboot instance")
2252
    else:
2253
      if not rpc.call_instance_shutdown(node_current, instance):
2254
        raise errors.OpExecError("could not shutdown instance for full reboot")
2255
      _ShutdownInstanceDisks(instance, self.cfg)
2256
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2257
      if not rpc.call_instance_start(node_current, instance, extra_args):
2258
        _ShutdownInstanceDisks(instance, self.cfg)
2259
        raise errors.OpExecError("Could not start instance for full reboot")
2260

    
2261
    self.cfg.MarkInstanceUp(instance.name)
2262

    
2263

    
2264
class LUShutdownInstance(LogicalUnit):
2265
  """Shutdown an instance.
2266

2267
  """
2268
  HPATH = "instance-stop"
2269
  HTYPE = constants.HTYPE_INSTANCE
2270
  _OP_REQP = ["instance_name"]
2271
  REQ_BGL = False
2272

    
2273
  def ExpandNames(self):
2274
    self._ExpandAndLockInstance()
2275
    self.needed_locks[locking.LEVEL_NODE] = []
2276
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2277

    
2278
  def DeclareLocks(self, level):
2279
    if level == locking.LEVEL_NODE:
2280
      self._LockInstancesNodes()
2281

    
2282
  def BuildHooksEnv(self):
2283
    """Build hooks env.
2284

2285
    This runs on master, primary and secondary nodes of the instance.
2286

2287
    """
2288
    env = _BuildInstanceHookEnvByObject(self.instance)
2289
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2290
          list(self.instance.secondary_nodes))
2291
    return env, nl, nl
2292

    
2293
  def CheckPrereq(self):
2294
    """Check prerequisites.
2295

2296
    This checks that the instance is in the cluster.
2297

2298
    """
2299
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2300
    assert self.instance is not None, \
2301
      "Cannot retrieve locked instance %s" % self.op.instance_name
2302

    
2303
  def Exec(self, feedback_fn):
2304
    """Shutdown the instance.
2305

2306
    """
2307
    instance = self.instance
2308
    node_current = instance.primary_node
2309
    self.cfg.MarkInstanceDown(instance.name)
2310
    if not rpc.call_instance_shutdown(node_current, instance):
2311
      logger.Error("could not shutdown instance")
2312

    
2313
    _ShutdownInstanceDisks(instance, self.cfg)
2314

    
2315

    
2316
class LUReinstallInstance(LogicalUnit):
2317
  """Reinstall an instance.
2318

2319
  """
2320
  HPATH = "instance-reinstall"
2321
  HTYPE = constants.HTYPE_INSTANCE
2322
  _OP_REQP = ["instance_name"]
2323
  REQ_BGL = False
2324

    
2325
  def ExpandNames(self):
2326
    self._ExpandAndLockInstance()
2327
    self.needed_locks[locking.LEVEL_NODE] = []
2328
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2329

    
2330
  def DeclareLocks(self, level):
2331
    if level == locking.LEVEL_NODE:
2332
      self._LockInstancesNodes()
2333

    
2334
  def BuildHooksEnv(self):
2335
    """Build hooks env.
2336

2337
    This runs on master, primary and secondary nodes of the instance.
2338

2339
    """
2340
    env = _BuildInstanceHookEnvByObject(self.instance)
2341
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2342
          list(self.instance.secondary_nodes))
2343
    return env, nl, nl
2344

    
2345
  def CheckPrereq(self):
2346
    """Check prerequisites.
2347

2348
    This checks that the instance is in the cluster and is not running.
2349

2350
    """
2351
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2352
    assert instance is not None, \
2353
      "Cannot retrieve locked instance %s" % self.op.instance_name
2354

    
2355
    if instance.disk_template == constants.DT_DISKLESS:
2356
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2357
                                 self.op.instance_name)
2358
    if instance.status != "down":
2359
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2360
                                 self.op.instance_name)
2361
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2362
    if remote_info:
2363
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2364
                                 (self.op.instance_name,
2365
                                  instance.primary_node))
2366

    
2367
    self.op.os_type = getattr(self.op, "os_type", None)
2368
    if self.op.os_type is not None:
2369
      # OS verification
2370
      pnode = self.cfg.GetNodeInfo(
2371
        self.cfg.ExpandNodeName(instance.primary_node))
2372
      if pnode is None:
2373
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2374
                                   self.op.pnode)
2375
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2376
      if not os_obj:
2377
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2378
                                   " primary node"  % self.op.os_type)
2379

    
2380
    self.instance = instance
2381

    
2382
  def Exec(self, feedback_fn):
2383
    """Reinstall the instance.
2384

2385
    """
2386
    inst = self.instance
2387

    
2388
    if self.op.os_type is not None:
2389
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2390
      inst.os = self.op.os_type
2391
      self.cfg.Update(inst)
2392

    
2393
    _StartInstanceDisks(self.cfg, inst, None)
2394
    try:
2395
      feedback_fn("Running the instance OS create scripts...")
2396
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2397
        raise errors.OpExecError("Could not install OS for instance %s"
2398
                                 " on node %s" %
2399
                                 (inst.name, inst.primary_node))
2400
    finally:
2401
      _ShutdownInstanceDisks(inst, self.cfg)
2402

    
2403

    
2404
class LURenameInstance(LogicalUnit):
2405
  """Rename an instance.
2406

2407
  """
2408
  HPATH = "instance-rename"
2409
  HTYPE = constants.HTYPE_INSTANCE
2410
  _OP_REQP = ["instance_name", "new_name"]
2411

    
2412
  def BuildHooksEnv(self):
2413
    """Build hooks env.
2414

2415
    This runs on master, primary and secondary nodes of the instance.
2416

2417
    """
2418
    env = _BuildInstanceHookEnvByObject(self.instance)
2419
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2420
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2421
          list(self.instance.secondary_nodes))
2422
    return env, nl, nl
2423

    
2424
  def CheckPrereq(self):
2425
    """Check prerequisites.
2426

2427
    This checks that the instance is in the cluster and is not running.
2428

2429
    """
2430
    instance = self.cfg.GetInstanceInfo(
2431
      self.cfg.ExpandInstanceName(self.op.instance_name))
2432
    if instance is None:
2433
      raise errors.OpPrereqError("Instance '%s' not known" %
2434
                                 self.op.instance_name)
2435
    if instance.status != "down":
2436
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2437
                                 self.op.instance_name)
2438
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2439
    if remote_info:
2440
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2441
                                 (self.op.instance_name,
2442
                                  instance.primary_node))
2443
    self.instance = instance
2444

    
2445
    # new name verification
2446
    name_info = utils.HostInfo(self.op.new_name)
2447

    
2448
    self.op.new_name = new_name = name_info.name
2449
    instance_list = self.cfg.GetInstanceList()
2450
    if new_name in instance_list:
2451
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2452
                                 new_name)
2453

    
2454
    if not getattr(self.op, "ignore_ip", False):
2455
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2456
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2457
                                   (name_info.ip, new_name))
2458

    
2459

    
2460
  def Exec(self, feedback_fn):
2461
    """Reinstall the instance.
2462

2463
    """
2464
    inst = self.instance
2465
    old_name = inst.name
2466

    
2467
    if inst.disk_template == constants.DT_FILE:
2468
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2469

    
2470
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2471
    # Change the instance lock. This is definitely safe while we hold the BGL
2472
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2473
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2474

    
2475
    # re-read the instance from the configuration after rename
2476
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2477

    
2478
    if inst.disk_template == constants.DT_FILE:
2479
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2480
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2481
                                                old_file_storage_dir,
2482
                                                new_file_storage_dir)
2483

    
2484
      if not result:
2485
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2486
                                 " directory '%s' to '%s' (but the instance"
2487
                                 " has been renamed in Ganeti)" % (
2488
                                 inst.primary_node, old_file_storage_dir,
2489
                                 new_file_storage_dir))
2490

    
2491
      if not result[0]:
2492
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2493
                                 " (but the instance has been renamed in"
2494
                                 " Ganeti)" % (old_file_storage_dir,
2495
                                               new_file_storage_dir))
2496

    
2497
    _StartInstanceDisks(self.cfg, inst, None)
2498
    try:
2499
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2500
                                          "sda", "sdb"):
2501
        msg = ("Could not run OS rename script for instance %s on node %s"
2502
               " (but the instance has been renamed in Ganeti)" %
2503
               (inst.name, inst.primary_node))
2504
        logger.Error(msg)
2505
    finally:
2506
      _ShutdownInstanceDisks(inst, self.cfg)
2507

    
2508

    
2509
class LURemoveInstance(LogicalUnit):
2510
  """Remove an instance.
2511

2512
  """
2513
  HPATH = "instance-remove"
2514
  HTYPE = constants.HTYPE_INSTANCE
2515
  _OP_REQP = ["instance_name", "ignore_failures"]
2516
  REQ_BGL = False
2517

    
2518
  def ExpandNames(self):
2519
    self._ExpandAndLockInstance()
2520
    self.needed_locks[locking.LEVEL_NODE] = []
2521
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2522

    
2523
  def DeclareLocks(self, level):
2524
    if level == locking.LEVEL_NODE:
2525
      self._LockInstancesNodes()
2526

    
2527
  def BuildHooksEnv(self):
2528
    """Build hooks env.
2529

2530
    This runs on master, primary and secondary nodes of the instance.
2531

2532
    """
2533
    env = _BuildInstanceHookEnvByObject(self.instance)
2534
    nl = [self.cfg.GetMasterNode()]
2535
    return env, nl, nl
2536

    
2537
  def CheckPrereq(self):
2538
    """Check prerequisites.
2539

2540
    This checks that the instance is in the cluster.
2541

2542
    """
2543
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2544
    assert self.instance is not None, \
2545
      "Cannot retrieve locked instance %s" % self.op.instance_name
2546

    
2547
  def Exec(self, feedback_fn):
2548
    """Remove the instance.
2549

2550
    """
2551
    instance = self.instance
2552
    logger.Info("shutting down instance %s on node %s" %
2553
                (instance.name, instance.primary_node))
2554

    
2555
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2556
      if self.op.ignore_failures:
2557
        feedback_fn("Warning: can't shutdown instance")
2558
      else:
2559
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2560
                                 (instance.name, instance.primary_node))
2561

    
2562
    logger.Info("removing block devices for instance %s" % instance.name)
2563

    
2564
    if not _RemoveDisks(instance, self.cfg):
2565
      if self.op.ignore_failures:
2566
        feedback_fn("Warning: can't remove instance's disks")
2567
      else:
2568
        raise errors.OpExecError("Can't remove instance's disks")
2569

    
2570
    logger.Info("removing instance %s out of cluster config" % instance.name)
2571

    
2572
    self.cfg.RemoveInstance(instance.name)
2573
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2574

    
2575

    
2576
class LUQueryInstances(NoHooksLU):
2577
  """Logical unit for querying instances.
2578

2579
  """
2580
  _OP_REQP = ["output_fields", "names"]
2581
  REQ_BGL = False
2582

    
2583
  def ExpandNames(self):
2584
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2585
    self.static_fields = frozenset([
2586
      "name", "os", "pnode", "snodes",
2587
      "admin_state", "admin_ram",
2588
      "disk_template", "ip", "mac", "bridge",
2589
      "sda_size", "sdb_size", "vcpus", "tags",
2590
      "network_port", "kernel_path", "initrd_path",
2591
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2592
      "hvm_cdrom_image_path", "hvm_nic_type",
2593
      "hvm_disk_type", "vnc_bind_address",
2594
      "serial_no",
2595
      ])
2596
    _CheckOutputFields(static=self.static_fields,
2597
                       dynamic=self.dynamic_fields,
2598
                       selected=self.op.output_fields)
2599

    
2600
    self.needed_locks = {}
2601
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2602
    self.share_locks[locking.LEVEL_NODE] = 1
2603

    
2604
    if self.op.names:
2605
      self.wanted = _GetWantedInstances(self, self.op.names)
2606
    else:
2607
      self.wanted = locking.ALL_SET
2608

    
2609
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2610
    if self.do_locking:
2611
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2612
      self.needed_locks[locking.LEVEL_NODE] = []
2613
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2614

    
2615
  def DeclareLocks(self, level):
2616
    if level == locking.LEVEL_NODE and self.do_locking:
2617
      self._LockInstancesNodes()
2618

    
2619
  def CheckPrereq(self):
2620
    """Check prerequisites.
2621

2622
    """
2623
    pass
2624

    
2625
  def Exec(self, feedback_fn):
2626
    """Computes the list of nodes and their attributes.
2627

2628
    """
2629
    all_info = self.cfg.GetAllInstancesInfo()
2630
    if self.do_locking:
2631
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2632
    elif self.wanted != locking.ALL_SET:
2633
      instance_names = self.wanted
2634
      missing = set(instance_names).difference(all_info.keys())
2635
      if missing:
2636
        raise self.OpExecError(
2637
          "Some instances were removed before retrieving their data: %s"
2638
          % missing)
2639
    else:
2640
      instance_names = all_info.keys()
2641
    instance_list = [all_info[iname] for iname in instance_names]
2642

    
2643
    # begin data gathering
2644

    
2645
    nodes = frozenset([inst.primary_node for inst in instance_list])
2646

    
2647
    bad_nodes = []
2648
    if self.dynamic_fields.intersection(self.op.output_fields):
2649
      live_data = {}
2650
      node_data = rpc.call_all_instances_info(nodes)
2651
      for name in nodes:
2652
        result = node_data[name]
2653
        if result:
2654
          live_data.update(result)
2655
        elif result == False:
2656
          bad_nodes.append(name)
2657
        # else no instance is alive
2658
    else:
2659
      live_data = dict([(name, {}) for name in instance_names])
2660

    
2661
    # end data gathering
2662

    
2663
    output = []
2664
    for instance in instance_list:
2665
      iout = []
2666
      for field in self.op.output_fields:
2667
        if field == "name":
2668
          val = instance.name
2669
        elif field == "os":
2670
          val = instance.os
2671
        elif field == "pnode":
2672
          val = instance.primary_node
2673
        elif field == "snodes":
2674
          val = list(instance.secondary_nodes)
2675
        elif field == "admin_state":
2676
          val = (instance.status != "down")
2677
        elif field == "oper_state":
2678
          if instance.primary_node in bad_nodes:
2679
            val = None
2680
          else:
2681
            val = bool(live_data.get(instance.name))
2682
        elif field == "status":
2683
          if instance.primary_node in bad_nodes:
2684
            val = "ERROR_nodedown"
2685
          else:
2686
            running = bool(live_data.get(instance.name))
2687
            if running:
2688
              if instance.status != "down":
2689
                val = "running"
2690
              else:
2691
                val = "ERROR_up"
2692
            else:
2693
              if instance.status != "down":
2694
                val = "ERROR_down"
2695
              else:
2696
                val = "ADMIN_down"
2697
        elif field == "admin_ram":
2698
          val = instance.memory
2699
        elif field == "oper_ram":
2700
          if instance.primary_node in bad_nodes:
2701
            val = None
2702
          elif instance.name in live_data:
2703
            val = live_data[instance.name].get("memory", "?")
2704
          else:
2705
            val = "-"
2706
        elif field == "disk_template":
2707
          val = instance.disk_template
2708
        elif field == "ip":
2709
          val = instance.nics[0].ip
2710
        elif field == "bridge":
2711
          val = instance.nics[0].bridge
2712
        elif field == "mac":
2713
          val = instance.nics[0].mac
2714
        elif field == "sda_size" or field == "sdb_size":
2715
          disk = instance.FindDisk(field[:3])
2716
          if disk is None:
2717
            val = None
2718
          else:
2719
            val = disk.size
2720
        elif field == "vcpus":
2721
          val = instance.vcpus
2722
        elif field == "tags":
2723
          val = list(instance.GetTags())
2724
        elif field == "serial_no":
2725
          val = instance.serial_no
2726
        elif field in ("network_port", "kernel_path", "initrd_path",
2727
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2728
                       "hvm_cdrom_image_path", "hvm_nic_type",
2729
                       "hvm_disk_type", "vnc_bind_address"):
2730
          val = getattr(instance, field, None)
2731
          if val is not None:
2732
            pass
2733
          elif field in ("hvm_nic_type", "hvm_disk_type",
2734
                         "kernel_path", "initrd_path"):
2735
            val = "default"
2736
          else:
2737
            val = "-"
2738
        else:
2739
          raise errors.ParameterError(field)
2740
        iout.append(val)
2741
      output.append(iout)
2742

    
2743
    return output
2744

    
2745

    
2746
class LUFailoverInstance(LogicalUnit):
2747
  """Failover an instance.
2748

2749
  """
2750
  HPATH = "instance-failover"
2751
  HTYPE = constants.HTYPE_INSTANCE
2752
  _OP_REQP = ["instance_name", "ignore_consistency"]
2753
  REQ_BGL = False
2754

    
2755
  def ExpandNames(self):
2756
    self._ExpandAndLockInstance()
2757
    self.needed_locks[locking.LEVEL_NODE] = []
2758
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2759

    
2760
  def DeclareLocks(self, level):
2761
    if level == locking.LEVEL_NODE:
2762
      self._LockInstancesNodes()
2763

    
2764
  def BuildHooksEnv(self):
2765
    """Build hooks env.
2766

2767
    This runs on master, primary and secondary nodes of the instance.
2768

2769
    """
2770
    env = {
2771
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2772
      }
2773
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2774
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2775
    return env, nl, nl
2776

    
2777
  def CheckPrereq(self):
2778
    """Check prerequisites.
2779

2780
    This checks that the instance is in the cluster.
2781

2782
    """
2783
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2784
    assert self.instance is not None, \
2785
      "Cannot retrieve locked instance %s" % self.op.instance_name
2786

    
2787
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2788
      raise errors.OpPrereqError("Instance's disk layout is not"
2789
                                 " network mirrored, cannot failover.")
2790

    
2791
    secondary_nodes = instance.secondary_nodes
2792
    if not secondary_nodes:
2793
      raise errors.ProgrammerError("no secondary node but using "
2794
                                   "a mirrored disk template")
2795

    
2796
    target_node = secondary_nodes[0]
2797
    # check memory requirements on the secondary node
2798
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2799
                         instance.name, instance.memory)
2800

    
2801
    # check bridge existance
2802
    brlist = [nic.bridge for nic in instance.nics]
2803
    if not rpc.call_bridges_exist(target_node, brlist):
2804
      raise errors.OpPrereqError("One or more target bridges %s does not"
2805
                                 " exist on destination node '%s'" %
2806
                                 (brlist, target_node))
2807

    
2808
  def Exec(self, feedback_fn):
2809
    """Failover an instance.
2810

2811
    The failover is done by shutting it down on its present node and
2812
    starting it on the secondary.
2813

2814
    """
2815
    instance = self.instance
2816

    
2817
    source_node = instance.primary_node
2818
    target_node = instance.secondary_nodes[0]
2819

    
2820
    feedback_fn("* checking disk consistency between source and target")
2821
    for dev in instance.disks:
2822
      # for drbd, these are drbd over lvm
2823
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2824
        if instance.status == "up" and not self.op.ignore_consistency:
2825
          raise errors.OpExecError("Disk %s is degraded on target node,"
2826
                                   " aborting failover." % dev.iv_name)
2827

    
2828
    feedback_fn("* shutting down instance on source node")
2829
    logger.Info("Shutting down instance %s on node %s" %
2830
                (instance.name, source_node))
2831

    
2832
    if not rpc.call_instance_shutdown(source_node, instance):
2833
      if self.op.ignore_consistency:
2834
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2835
                     " anyway. Please make sure node %s is down"  %
2836
                     (instance.name, source_node, source_node))
2837
      else:
2838
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2839
                                 (instance.name, source_node))
2840

    
2841
    feedback_fn("* deactivating the instance's disks on source node")
2842
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2843
      raise errors.OpExecError("Can't shut down the instance's disks.")
2844

    
2845
    instance.primary_node = target_node
2846
    # distribute new instance config to the other nodes
2847
    self.cfg.Update(instance)
2848

    
2849
    # Only start the instance if it's marked as up
2850
    if instance.status == "up":
2851
      feedback_fn("* activating the instance's disks on target node")
2852
      logger.Info("Starting instance %s on node %s" %
2853
                  (instance.name, target_node))
2854

    
2855
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2856
                                               ignore_secondaries=True)
2857
      if not disks_ok:
2858
        _ShutdownInstanceDisks(instance, self.cfg)
2859
        raise errors.OpExecError("Can't activate the instance's disks")
2860

    
2861
      feedback_fn("* starting the instance on the target node")
2862
      if not rpc.call_instance_start(target_node, instance, None):
2863
        _ShutdownInstanceDisks(instance, self.cfg)
2864
        raise errors.OpExecError("Could not start instance %s on node %s." %
2865
                                 (instance.name, target_node))
2866

    
2867

    
2868
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2869
  """Create a tree of block devices on the primary node.
2870

2871
  This always creates all devices.
2872

2873
  """
2874
  if device.children:
2875
    for child in device.children:
2876
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2877
        return False
2878

    
2879
  cfg.SetDiskID(device, node)
2880
  new_id = rpc.call_blockdev_create(node, device, device.size,
2881
                                    instance.name, True, info)
2882
  if not new_id:
2883
    return False
2884
  if device.physical_id is None:
2885
    device.physical_id = new_id
2886
  return True
2887

    
2888

    
2889
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2890
  """Create a tree of block devices on a secondary node.
2891

2892
  If this device type has to be created on secondaries, create it and
2893
  all its children.
2894

2895
  If not, just recurse to children keeping the same 'force' value.
2896

2897
  """
2898
  if device.CreateOnSecondary():
2899
    force = True
2900
  if device.children:
2901
    for child in device.children:
2902
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2903
                                        child, force, info):
2904
        return False
2905

    
2906
  if not force:
2907
    return True
2908
  cfg.SetDiskID(device, node)
2909
  new_id = rpc.call_blockdev_create(node, device, device.size,
2910
                                    instance.name, False, info)
2911
  if not new_id:
2912
    return False
2913
  if device.physical_id is None:
2914
    device.physical_id = new_id
2915
  return True
2916

    
2917

    
2918
def _GenerateUniqueNames(cfg, exts):
2919
  """Generate a suitable LV name.
2920

2921
  This will generate a logical volume name for the given instance.
2922

2923
  """
2924
  results = []
2925
  for val in exts:
2926
    new_id = cfg.GenerateUniqueID()
2927
    results.append("%s%s" % (new_id, val))
2928
  return results
2929

    
2930

    
2931
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name,
2932
                         p_minor, s_minor):
2933
  """Generate a drbd8 device complete with its children.
2934

2935
  """
2936
  port = cfg.AllocatePort()
2937
  vgname = cfg.GetVGName()
2938
  shared_secret = cfg.GenerateDRBDSecret()
2939
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2940
                          logical_id=(vgname, names[0]))
2941
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2942
                          logical_id=(vgname, names[1]))
2943
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2944
                          logical_id=(primary, secondary, port,
2945
                                      p_minor, s_minor,
2946
                                      shared_secret),
2947
                          children=[dev_data, dev_meta],
2948
                          iv_name=iv_name)
2949
  return drbd_dev
2950

    
2951

    
2952
def _GenerateDiskTemplate(cfg, template_name,
2953
                          instance_name, primary_node,
2954
                          secondary_nodes, disk_sz, swap_sz,
2955
                          file_storage_dir, file_driver):
2956
  """Generate the entire disk layout for a given template type.
2957

2958
  """
2959
  #TODO: compute space requirements
2960

    
2961
  vgname = cfg.GetVGName()
2962
  if template_name == constants.DT_DISKLESS:
2963
    disks = []
2964
  elif template_name == constants.DT_PLAIN:
2965
    if len(secondary_nodes) != 0:
2966
      raise errors.ProgrammerError("Wrong template configuration")
2967

    
2968
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2969
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2970
                           logical_id=(vgname, names[0]),
2971
                           iv_name = "sda")
2972
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2973
                           logical_id=(vgname, names[1]),
2974
                           iv_name = "sdb")
2975
    disks = [sda_dev, sdb_dev]
2976
  elif template_name == constants.DT_DRBD8:
2977
    if len(secondary_nodes) != 1:
2978
      raise errors.ProgrammerError("Wrong template configuration")
2979
    remote_node = secondary_nodes[0]
2980
    (minor_pa, minor_pb,
2981
     minor_sa, minor_sb) = cfg.AllocateDRBDMinor(
2982
      [primary_node, primary_node, remote_node, remote_node], instance_name)
2983

    
2984
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2985
                                       ".sdb_data", ".sdb_meta"])
2986
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2987
                                        disk_sz, names[0:2], "sda",
2988
                                        minor_pa, minor_sa)
2989
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2990
                                        swap_sz, names[2:4], "sdb",
2991
                                        minor_pb, minor_sb)
2992
    disks = [drbd_sda_dev, drbd_sdb_dev]
2993
  elif template_name == constants.DT_FILE:
2994
    if len(secondary_nodes) != 0:
2995
      raise errors.ProgrammerError("Wrong template configuration")
2996

    
2997
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2998
                                iv_name="sda", logical_id=(file_driver,
2999
                                "%s/sda" % file_storage_dir))
3000
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3001
                                iv_name="sdb", logical_id=(file_driver,
3002
                                "%s/sdb" % file_storage_dir))
3003
    disks = [file_sda_dev, file_sdb_dev]
3004
  else:
3005
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3006
  return disks
3007

    
3008

    
3009
def _GetInstanceInfoText(instance):
3010
  """Compute that text that should be added to the disk's metadata.
3011

3012
  """
3013
  return "originstname+%s" % instance.name
3014

    
3015

    
3016
def _CreateDisks(cfg, instance):
3017
  """Create all disks for an instance.
3018

3019
  This abstracts away some work from AddInstance.
3020

3021
  Args:
3022
    instance: the instance object
3023

3024
  Returns:
3025
    True or False showing the success of the creation process
3026

3027
  """
3028
  info = _GetInstanceInfoText(instance)
3029

    
3030
  if instance.disk_template == constants.DT_FILE:
3031
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3032
    result = rpc.call_file_storage_dir_create(instance.primary_node,
3033
                                              file_storage_dir)
3034

    
3035
    if not result:
3036
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3037
      return False
3038

    
3039
    if not result[0]:
3040
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3041
      return False
3042

    
3043
  for device in instance.disks:
3044
    logger.Info("creating volume %s for instance %s" %
3045
                (device.iv_name, instance.name))
3046
    #HARDCODE
3047
    for secondary_node in instance.secondary_nodes:
3048
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3049
                                        device, False, info):
3050
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3051
                     (device.iv_name, device, secondary_node))
3052
        return False
3053
    #HARDCODE
3054
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3055
                                    instance, device, info):
3056
      logger.Error("failed to create volume %s on primary!" %
3057
                   device.iv_name)
3058
      return False
3059

    
3060
  return True
3061

    
3062

    
3063
def _RemoveDisks(instance, cfg):
3064
  """Remove all disks for an instance.
3065

3066
  This abstracts away some work from `AddInstance()` and
3067
  `RemoveInstance()`. Note that in case some of the devices couldn't
3068
  be removed, the removal will continue with the other ones (compare
3069
  with `_CreateDisks()`).
3070

3071
  Args:
3072
    instance: the instance object
3073

3074
  Returns:
3075
    True or False showing the success of the removal proces
3076

3077
  """
3078
  logger.Info("removing block devices for instance %s" % instance.name)
3079

    
3080
  result = True
3081
  for device in instance.disks:
3082
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3083
      cfg.SetDiskID(disk, node)
3084
      if not rpc.call_blockdev_remove(node, disk):
3085
        logger.Error("could not remove block device %s on node %s,"
3086
                     " continuing anyway" %
3087
                     (device.iv_name, node))
3088
        result = False
3089

    
3090
  if instance.disk_template == constants.DT_FILE:
3091
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3092
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3093
                                            file_storage_dir):
3094
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3095
      result = False
3096

    
3097
  return result
3098

    
3099

    
3100
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3101
  """Compute disk size requirements in the volume group
3102

3103
  This is currently hard-coded for the two-drive layout.
3104

3105
  """
3106
  # Required free disk space as a function of disk and swap space
3107
  req_size_dict = {
3108
    constants.DT_DISKLESS: None,
3109
    constants.DT_PLAIN: disk_size + swap_size,
3110
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3111
    constants.DT_DRBD8: disk_size + swap_size + 256,
3112
    constants.DT_FILE: None,
3113
  }
3114

    
3115
  if disk_template not in req_size_dict:
3116
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3117
                                 " is unknown" %  disk_template)
3118

    
3119
  return req_size_dict[disk_template]
3120

    
3121

    
3122
class LUCreateInstance(LogicalUnit):
3123
  """Create an instance.
3124

3125
  """
3126
  HPATH = "instance-add"
3127
  HTYPE = constants.HTYPE_INSTANCE
3128
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3129
              "disk_template", "swap_size", "mode", "start", "vcpus",
3130
              "wait_for_sync", "ip_check", "mac"]
3131
  REQ_BGL = False
3132

    
3133
  def _ExpandNode(self, node):
3134
    """Expands and checks one node name.
3135

3136
    """
3137
    node_full = self.cfg.ExpandNodeName(node)
3138
    if node_full is None:
3139
      raise errors.OpPrereqError("Unknown node %s" % node)
3140
    return node_full
3141

    
3142
  def ExpandNames(self):
3143
    """ExpandNames for CreateInstance.
3144

3145
    Figure out the right locks for instance creation.
3146

3147
    """
3148
    self.needed_locks = {}
3149

    
3150
    # set optional parameters to none if they don't exist
3151
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3152
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3153
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3154
                 "vnc_bind_address"]:
3155
      if not hasattr(self.op, attr):
3156
        setattr(self.op, attr, None)
3157

    
3158
    # verify creation mode
3159
    if self.op.mode not in (constants.INSTANCE_CREATE,
3160
                            constants.INSTANCE_IMPORT):
3161
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3162
                                 self.op.mode)
3163
    # disk template and mirror node verification
3164
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3165
      raise errors.OpPrereqError("Invalid disk template name")
3166

    
3167
    #### instance parameters check
3168

    
3169
    # instance name verification
3170
    hostname1 = utils.HostInfo(self.op.instance_name)
3171
    self.op.instance_name = instance_name = hostname1.name
3172

    
3173
    # this is just a preventive check, but someone might still add this
3174
    # instance in the meantime, and creation will fail at lock-add time
3175
    if instance_name in self.cfg.GetInstanceList():
3176
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3177
                                 instance_name)
3178

    
3179
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3180

    
3181
    # ip validity checks
3182
    ip = getattr(self.op, "ip", None)
3183
    if ip is None or ip.lower() == "none":
3184
      inst_ip = None
3185
    elif ip.lower() == "auto":
3186
      inst_ip = hostname1.ip
3187
    else:
3188
      if not utils.IsValidIP(ip):
3189
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3190
                                   " like a valid IP" % ip)
3191
      inst_ip = ip
3192
    self.inst_ip = self.op.ip = inst_ip
3193
    # used in CheckPrereq for ip ping check
3194
    self.check_ip = hostname1.ip
3195

    
3196
    # MAC address verification
3197
    if self.op.mac != "auto":
3198
      if not utils.IsValidMac(self.op.mac.lower()):
3199
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3200
                                   self.op.mac)
3201

    
3202
    # boot order verification
3203
    if self.op.hvm_boot_order is not None:
3204
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3205
        raise errors.OpPrereqError("invalid boot order specified,"
3206
                                   " must be one or more of [acdn]")
3207
    # file storage checks
3208
    if (self.op.file_driver and
3209
        not self.op.file_driver in constants.FILE_DRIVER):
3210
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3211
                                 self.op.file_driver)
3212

    
3213
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3214
      raise errors.OpPrereqError("File storage directory path not absolute")
3215

    
3216
    ### Node/iallocator related checks
3217
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3218
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3219
                                 " node must be given")
3220

    
3221
    if self.op.iallocator:
3222
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3223
    else:
3224
      self.op.pnode = self._ExpandNode(self.op.pnode)
3225
      nodelist = [self.op.pnode]
3226
      if self.op.snode is not None:
3227
        self.op.snode = self._ExpandNode(self.op.snode)
3228
        nodelist.append(self.op.snode)
3229
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3230

    
3231
    # in case of import lock the source node too
3232
    if self.op.mode == constants.INSTANCE_IMPORT:
3233
      src_node = getattr(self.op, "src_node", None)
3234
      src_path = getattr(self.op, "src_path", None)
3235

    
3236
      if src_node is None or src_path is None:
3237
        raise errors.OpPrereqError("Importing an instance requires source"
3238
                                   " node and path options")
3239

    
3240
      if not os.path.isabs(src_path):
3241
        raise errors.OpPrereqError("The source path must be absolute")
3242

    
3243
      self.op.src_node = src_node = self._ExpandNode(src_node)
3244
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3245
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3246

    
3247
    else: # INSTANCE_CREATE
3248
      if getattr(self.op, "os_type", None) is None:
3249
        raise errors.OpPrereqError("No guest OS specified")
3250

    
3251
  def _RunAllocator(self):
3252
    """Run the allocator based on input opcode.
3253

3254
    """
3255
    disks = [{"size": self.op.disk_size, "mode": "w"},
3256
             {"size": self.op.swap_size, "mode": "w"}]
3257
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3258
             "bridge": self.op.bridge}]
3259
    ial = IAllocator(self.cfg,
3260
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3261
                     name=self.op.instance_name,
3262
                     disk_template=self.op.disk_template,
3263
                     tags=[],
3264
                     os=self.op.os_type,
3265
                     vcpus=self.op.vcpus,
3266
                     mem_size=self.op.mem_size,
3267
                     disks=disks,
3268
                     nics=nics,
3269
                     )
3270

    
3271
    ial.Run(self.op.iallocator)
3272

    
3273
    if not ial.success:
3274
      raise errors.OpPrereqError("Can't compute nodes using"
3275
                                 " iallocator '%s': %s" % (self.op.iallocator,
3276
                                                           ial.info))
3277
    if len(ial.nodes) != ial.required_nodes:
3278
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3279
                                 " of nodes (%s), required %s" %
3280
                                 (self.op.iallocator, len(ial.nodes),
3281
                                  ial.required_nodes))
3282
    self.op.pnode = ial.nodes[0]
3283
    logger.ToStdout("Selected nodes for the instance: %s" %
3284
                    (", ".join(ial.nodes),))
3285
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3286
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3287
    if ial.required_nodes == 2:
3288
      self.op.snode = ial.nodes[1]
3289

    
3290
  def BuildHooksEnv(self):
3291
    """Build hooks env.
3292

3293
    This runs on master, primary and secondary nodes of the instance.
3294

3295
    """
3296
    env = {
3297
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3298
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3299
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3300
      "INSTANCE_ADD_MODE": self.op.mode,
3301
      }
3302
    if self.op.mode == constants.INSTANCE_IMPORT:
3303
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3304
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3305
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3306

    
3307
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3308
      primary_node=self.op.pnode,
3309
      secondary_nodes=self.secondaries,
3310
      status=self.instance_status,
3311
      os_type=self.op.os_type,
3312
      memory=self.op.mem_size,
3313
      vcpus=self.op.vcpus,
3314
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3315
    ))
3316

    
3317
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3318
          self.secondaries)
3319
    return env, nl, nl
3320

    
3321

    
3322
  def CheckPrereq(self):
3323
    """Check prerequisites.
3324

3325
    """
3326
    if (not self.cfg.GetVGName() and
3327
        self.op.disk_template not in constants.DTS_NOT_LVM):
3328
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3329
                                 " instances")
3330

    
3331
    if self.op.mode == constants.INSTANCE_IMPORT:
3332
      src_node = self.op.src_node
3333
      src_path = self.op.src_path
3334

    
3335
      export_info = rpc.call_export_info(src_node, src_path)
3336

    
3337
      if not export_info:
3338
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3339

    
3340
      if not export_info.has_section(constants.INISECT_EXP):
3341
        raise errors.ProgrammerError("Corrupted export config")
3342

    
3343
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3344
      if (int(ei_version) != constants.EXPORT_VERSION):
3345
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3346
                                   (ei_version, constants.EXPORT_VERSION))
3347

    
3348
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3349
        raise errors.OpPrereqError("Can't import instance with more than"
3350
                                   " one data disk")
3351

    
3352
      # FIXME: are the old os-es, disk sizes, etc. useful?
3353
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3354
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3355
                                                         'disk0_dump'))
3356
      self.src_image = diskimage
3357

    
3358
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3359

    
3360
    if self.op.start and not self.op.ip_check:
3361
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3362
                                 " adding an instance in start mode")
3363

    
3364
    if self.op.ip_check:
3365
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3366
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3367
                                   (self.check_ip, instance_name))
3368

    
3369
    # bridge verification
3370
    bridge = getattr(self.op, "bridge", None)
3371
    if bridge is None:
3372
      self.op.bridge = self.cfg.GetDefBridge()
3373
    else:
3374
      self.op.bridge = bridge
3375

    
3376
    #### allocator run
3377

    
3378
    if self.op.iallocator is not None:
3379
      self._RunAllocator()
3380

    
3381
    #### node related checks
3382

    
3383
    # check primary node
3384
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3385
    assert self.pnode is not None, \
3386
      "Cannot retrieve locked node %s" % self.op.pnode
3387
    self.secondaries = []
3388

    
3389
    # mirror node verification
3390
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3391
      if self.op.snode is None:
3392
        raise errors.OpPrereqError("The networked disk templates need"
3393
                                   " a mirror node")
3394
      if self.op.snode == pnode.name:
3395
        raise errors.OpPrereqError("The secondary node cannot be"
3396
                                   " the primary node.")
3397
      self.secondaries.append(self.op.snode)
3398

    
3399
    req_size = _ComputeDiskSize(self.op.disk_template,
3400
                                self.op.disk_size, self.op.swap_size)
3401

    
3402
    # Check lv size requirements
3403
    if req_size is not None:
3404
      nodenames = [pnode.name] + self.secondaries
3405
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3406
      for node in nodenames:
3407
        info = nodeinfo.get(node, None)
3408
        if not info:
3409
          raise errors.OpPrereqError("Cannot get current information"
3410
                                     " from node '%s'" % node)
3411
        vg_free = info.get('vg_free', None)
3412
        if not isinstance(vg_free, int):
3413
          raise errors.OpPrereqError("Can't compute free disk space on"
3414
                                     " node %s" % node)
3415
        if req_size > info['vg_free']:
3416
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3417
                                     " %d MB available, %d MB required" %
3418
                                     (node, info['vg_free'], req_size))
3419

    
3420
    # os verification
3421
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3422
    if not os_obj:
3423
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3424
                                 " primary node"  % self.op.os_type)
3425

    
3426
    if self.op.kernel_path == constants.VALUE_NONE:
3427
      raise errors.OpPrereqError("Can't set instance kernel to none")
3428

    
3429
    # bridge check on primary node
3430
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3431
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3432
                                 " destination node '%s'" %
3433
                                 (self.op.bridge, pnode.name))
3434

    
3435
    # memory check on primary node
3436
    if self.op.start:
3437
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3438
                           "creating instance %s" % self.op.instance_name,
3439
                           self.op.mem_size)
3440

    
3441
    # hvm_cdrom_image_path verification
3442
    if self.op.hvm_cdrom_image_path is not None:
3443
      # FIXME (als): shouldn't these checks happen on the destination node?
3444
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3445
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3446
                                   " be an absolute path or None, not %s" %
3447
                                   self.op.hvm_cdrom_image_path)
3448
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3449
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3450
                                   " regular file or a symlink pointing to"
3451
                                   " an existing regular file, not %s" %
3452
                                   self.op.hvm_cdrom_image_path)
3453

    
3454
    # vnc_bind_address verification
3455
    if self.op.vnc_bind_address is not None:
3456
      if not utils.IsValidIP(self.op.vnc_bind_address):
3457
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3458
                                   " like a valid IP address" %
3459
                                   self.op.vnc_bind_address)
3460

    
3461
    # Xen HVM device type checks
3462
    if self.cfg.GetHypervisorType() == constants.HT_XEN_HVM31:
3463
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3464
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3465
                                   " hypervisor" % self.op.hvm_nic_type)
3466
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3467
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3468
                                   " hypervisor" % self.op.hvm_disk_type)
3469

    
3470
    if self.op.start:
3471
      self.instance_status = 'up'
3472
    else:
3473
      self.instance_status = 'down'
3474

    
3475
  def Exec(self, feedback_fn):
3476
    """Create and add the instance to the cluster.
3477

3478
    """
3479
    instance = self.op.instance_name
3480
    pnode_name = self.pnode.name
3481

    
3482
    if self.op.mac == "auto":
3483
      mac_address = self.cfg.GenerateMAC()
3484
    else:
3485
      mac_address = self.op.mac
3486

    
3487
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3488
    if self.inst_ip is not None:
3489
      nic.ip = self.inst_ip
3490

    
3491
    ht_kind = self.cfg.GetHypervisorType()
3492
    if ht_kind in constants.HTS_REQ_PORT:
3493
      network_port = self.cfg.AllocatePort()
3494
    else:
3495
      network_port = None
3496

    
3497
    if self.op.vnc_bind_address is None:
3498
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3499

    
3500
    # this is needed because os.path.join does not accept None arguments
3501
    if self.op.file_storage_dir is None:
3502
      string_file_storage_dir = ""
3503
    else:
3504
      string_file_storage_dir = self.op.file_storage_dir
3505

    
3506
    # build the full file storage dir path
3507
    file_storage_dir = os.path.normpath(os.path.join(
3508
                                        self.cfg.GetFileStorageDir(),
3509
                                        string_file_storage_dir, instance))
3510

    
3511

    
3512
    disks = _GenerateDiskTemplate(self.cfg,
3513
                                  self.op.disk_template,
3514
                                  instance, pnode_name,
3515
                                  self.secondaries, self.op.disk_size,
3516
                                  self.op.swap_size,
3517
                                  file_storage_dir,
3518
                                  self.op.file_driver)
3519

    
3520
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3521
                            primary_node=pnode_name,
3522
                            memory=self.op.mem_size,
3523
                            vcpus=self.op.vcpus,
3524
                            nics=[nic], disks=disks,
3525
                            disk_template=self.op.disk_template,
3526
                            status=self.instance_status,
3527
                            network_port=network_port,
3528
                            kernel_path=self.op.kernel_path,
3529
                            initrd_path=self.op.initrd_path,
3530
                            hvm_boot_order=self.op.hvm_boot_order,
3531
                            hvm_acpi=self.op.hvm_acpi,
3532
                            hvm_pae=self.op.hvm_pae,
3533
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3534
                            vnc_bind_address=self.op.vnc_bind_address,
3535
                            hvm_nic_type=self.op.hvm_nic_type,
3536
                            hvm_disk_type=self.op.hvm_disk_type,
3537
                            )
3538

    
3539
    feedback_fn("* creating instance disks...")
3540
    if not _CreateDisks(self.cfg, iobj):
3541
      _RemoveDisks(iobj, self.cfg)
3542
      self.cfg.ReleaseDRBDMinors(instance)
3543
      raise errors.OpExecError("Device creation failed, reverting...")
3544

    
3545
    feedback_fn("adding instance %s to cluster config" % instance)
3546

    
3547
    self.cfg.AddInstance(iobj)
3548
    # Declare that we don't want to remove the instance lock anymore, as we've
3549
    # added the instance to the config
3550
    del self.remove_locks[locking.LEVEL_INSTANCE]
3551
    # Remove the temp. assignements for the instance's drbds
3552
    self.cfg.ReleaseDRBDMinors(instance)
3553

    
3554
    if self.op.wait_for_sync:
3555
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3556
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3557
      # make sure the disks are not degraded (still sync-ing is ok)
3558
      time.sleep(15)
3559
      feedback_fn("* checking mirrors status")
3560
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3561
    else:
3562
      disk_abort = False
3563

    
3564
    if disk_abort:
3565
      _RemoveDisks(iobj, self.cfg)
3566
      self.cfg.RemoveInstance(iobj.name)
3567
      # Make sure the instance lock gets removed
3568
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3569
      raise errors.OpExecError("There are some degraded disks for"
3570
                               " this instance")
3571

    
3572
    feedback_fn("creating os for instance %s on node %s" %
3573
                (instance, pnode_name))
3574

    
3575
    if iobj.disk_template != constants.DT_DISKLESS:
3576
      if self.op.mode == constants.INSTANCE_CREATE:
3577
        feedback_fn("* running the instance OS create scripts...")
3578
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3579
          raise errors.OpExecError("could not add os for instance %s"
3580
                                   " on node %s" %
3581
                                   (instance, pnode_name))
3582

    
3583
      elif self.op.mode == constants.INSTANCE_IMPORT:
3584
        feedback_fn("* running the instance OS import scripts...")
3585
        src_node = self.op.src_node
3586
        src_image = self.src_image
3587
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3588
                                                src_node, src_image):
3589
          raise errors.OpExecError("Could not import os for instance"
3590
                                   " %s on node %s" %
3591
                                   (instance, pnode_name))
3592
      else:
3593
        # also checked in the prereq part
3594
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3595
                                     % self.op.mode)
3596

    
3597
    if self.op.start:
3598
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3599
      feedback_fn("* starting instance...")
3600
      if not rpc.call_instance_start(pnode_name, iobj, None):
3601
        raise errors.OpExecError("Could not start instance")
3602

    
3603

    
3604
class LUConnectConsole(NoHooksLU):
3605
  """Connect to an instance's console.
3606

3607
  This is somewhat special in that it returns the command line that
3608
  you need to run on the master node in order to connect to the
3609
  console.
3610

3611
  """
3612
  _OP_REQP = ["instance_name"]
3613
  REQ_BGL = False
3614

    
3615
  def ExpandNames(self):
3616
    self._ExpandAndLockInstance()
3617

    
3618
  def CheckPrereq(self):
3619
    """Check prerequisites.
3620

3621
    This checks that the instance is in the cluster.
3622

3623
    """
3624
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3625
    assert self.instance is not None, \
3626
      "Cannot retrieve locked instance %s" % self.op.instance_name
3627

    
3628
  def Exec(self, feedback_fn):
3629
    """Connect to the console of an instance
3630

3631
    """
3632
    instance = self.instance
3633
    node = instance.primary_node
3634

    
3635
    node_insts = rpc.call_instance_list([node])[node]
3636
    if node_insts is False:
3637
      raise errors.OpExecError("Can't connect to node %s." % node)
3638

    
3639
    if instance.name not in node_insts:
3640
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3641

    
3642
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3643

    
3644
    hyper = hypervisor.GetHypervisor(self.cfg)
3645
    console_cmd = hyper.GetShellCommandForConsole(instance)
3646

    
3647
    # build ssh cmdline
3648
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3649

    
3650

    
3651
class LUReplaceDisks(LogicalUnit):
3652
  """Replace the disks of an instance.
3653

3654
  """
3655
  HPATH = "mirrors-replace"
3656
  HTYPE = constants.HTYPE_INSTANCE
3657
  _OP_REQP = ["instance_name", "mode", "disks"]
3658
  REQ_BGL = False
3659

    
3660
  def ExpandNames(self):
3661
    self._ExpandAndLockInstance()
3662

    
3663
    if not hasattr(self.op, "remote_node"):
3664
      self.op.remote_node = None
3665

    
3666
    ia_name = getattr(self.op, "iallocator", None)
3667
    if ia_name is not None:
3668
      if self.op.remote_node is not None:
3669
        raise errors.OpPrereqError("Give either the iallocator or the new"
3670
                                   " secondary, not both")
3671
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3672
    elif self.op.remote_node is not None:
3673
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3674
      if remote_node is None:
3675
        raise errors.OpPrereqError("Node '%s' not known" %
3676
                                   self.op.remote_node)
3677
      self.op.remote_node = remote_node
3678
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3679
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3680
    else:
3681
      self.needed_locks[locking.LEVEL_NODE] = []
3682
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3683

    
3684
  def DeclareLocks(self, level):
3685
    # If we're not already locking all nodes in the set we have to declare the
3686
    # instance's primary/secondary nodes.
3687
    if (level == locking.LEVEL_NODE and
3688
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3689
      self._LockInstancesNodes()
3690

    
3691
  def _RunAllocator(self):
3692
    """Compute a new secondary node using an IAllocator.
3693

3694
    """
3695
    ial = IAllocator(self.cfg,
3696
                     mode=constants.IALLOCATOR_MODE_RELOC,
3697
                     name=self.op.instance_name,
3698
                     relocate_from=[self.sec_node])
3699

    
3700
    ial.Run(self.op.iallocator)
3701

    
3702
    if not ial.success:
3703
      raise errors.OpPrereqError("Can't compute nodes using"
3704
                                 " iallocator '%s': %s" % (self.op.iallocator,
3705
                                                           ial.info))
3706
    if len(ial.nodes) != ial.required_nodes:
3707
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3708
                                 " of nodes (%s), required %s" %
3709
                                 (len(ial.nodes), ial.required_nodes))
3710
    self.op.remote_node = ial.nodes[0]
3711
    logger.ToStdout("Selected new secondary for the instance: %s" %
3712
                    self.op.remote_node)
3713

    
3714
  def BuildHooksEnv(self):
3715
    """Build hooks env.
3716

3717
    This runs on the master, the primary and all the secondaries.
3718

3719
    """
3720
    env = {
3721
      "MODE": self.op.mode,
3722
      "NEW_SECONDARY": self.op.remote_node,
3723
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3724
      }
3725
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3726
    nl = [
3727
      self.cfg.GetMasterNode(),
3728
      self.instance.primary_node,
3729
      ]
3730
    if self.op.remote_node is not None:
3731
      nl.append(self.op.remote_node)
3732
    return env, nl, nl
3733

    
3734
  def CheckPrereq(self):
3735
    """Check prerequisites.
3736

3737
    This checks that the instance is in the cluster.
3738

3739
    """
3740
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3741
    assert instance is not None, \
3742
      "Cannot retrieve locked instance %s" % self.op.instance_name
3743
    self.instance = instance
3744

    
3745
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3746
      raise errors.OpPrereqError("Instance's disk layout is not"
3747
                                 " network mirrored.")
3748

    
3749
    if len(instance.secondary_nodes) != 1:
3750
      raise errors.OpPrereqError("The instance has a strange layout,"
3751
                                 " expected one secondary but found %d" %
3752
                                 len(instance.secondary_nodes))
3753

    
3754
    self.sec_node = instance.secondary_nodes[0]
3755

    
3756
    ia_name = getattr(self.op, "iallocator", None)
3757
    if ia_name is not None:
3758
      self._RunAllocator()
3759

    
3760
    remote_node = self.op.remote_node
3761
    if remote_node is not None:
3762
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3763
      assert self.remote_node_info is not None, \
3764
        "Cannot retrieve locked node %s" % remote_node
3765
    else:
3766
      self.remote_node_info = None
3767
    if remote_node == instance.primary_node:
3768
      raise errors.OpPrereqError("The specified node is the primary node of"
3769
                                 " the instance.")
3770
    elif remote_node == self.sec_node:
3771
      if self.op.mode == constants.REPLACE_DISK_SEC:
3772
        # this is for DRBD8, where we can't execute the same mode of
3773
        # replacement as for drbd7 (no different port allocated)
3774
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3775
                                   " replacement")
3776
    if instance.disk_template == constants.DT_DRBD8:
3777
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3778
          remote_node is not None):
3779
        # switch to replace secondary mode
3780
        self.op.mode = constants.REPLACE_DISK_SEC
3781

    
3782
      if self.op.mode == constants.REPLACE_DISK_ALL:
3783
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3784
                                   " secondary disk replacement, not"
3785
                                   " both at once")
3786
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3787
        if remote_node is not None:
3788
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3789
                                     " the secondary while doing a primary"
3790
                                     " node disk replacement")
3791
        self.tgt_node = instance.primary_node
3792
        self.oth_node = instance.secondary_nodes[0]
3793
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3794
        self.new_node = remote_node # this can be None, in which case
3795
                                    # we don't change the secondary
3796
        self.tgt_node = instance.secondary_nodes[0]
3797
        self.oth_node = instance.primary_node
3798
      else:
3799
        raise errors.ProgrammerError("Unhandled disk replace mode")
3800

    
3801
    for name in self.op.disks:
3802
      if instance.FindDisk(name) is None:
3803
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3804
                                   (name, instance.name))
3805

    
3806
  def _ExecD8DiskOnly(self, feedback_fn):
3807
    """Replace a disk on the primary or secondary for dbrd8.
3808

3809
    The algorithm for replace is quite complicated:
3810
      - for each disk to be replaced:
3811
        - create new LVs on the target node with unique names
3812
        - detach old LVs from the drbd device
3813
        - rename old LVs to name_replaced.<time_t>
3814
        - rename new LVs to old LVs
3815
        - attach the new LVs (with the old names now) to the drbd device
3816
      - wait for sync across all devices
3817
      - for each modified disk:
3818
        - remove old LVs (which have the name name_replaces.<time_t>)
3819

3820
    Failures are not very well handled.
3821

3822
    """
3823
    steps_total = 6
3824
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3825
    instance = self.instance
3826
    iv_names = {}
3827
    vgname = self.cfg.GetVGName()
3828
    # start of work
3829
    cfg = self.cfg
3830
    tgt_node = self.tgt_node
3831
    oth_node = self.oth_node
3832

    
3833
    # Step: check device activation
3834
    self.proc.LogStep(1, steps_total, "check device existence")
3835
    info("checking volume groups")
3836
    my_vg = cfg.GetVGName()
3837
    results = rpc.call_vg_list([oth_node, tgt_node])
3838
    if not results:
3839
      raise errors.OpExecError("Can't list volume groups on the nodes")
3840
    for node in oth_node, tgt_node:
3841
      res = results.get(node, False)
3842
      if not res or my_vg not in res:
3843
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3844
                                 (my_vg, node))
3845
    for dev in instance.disks:
3846
      if not dev.iv_name in self.op.disks:
3847
        continue
3848
      for node in tgt_node, oth_node:
3849
        info("checking %s on %s" % (dev.iv_name, node))
3850
        cfg.SetDiskID(dev, node)
3851
        if not rpc.call_blockdev_find(node, dev):
3852
          raise errors.OpExecError("Can't find device %s on node %s" %
3853
                                   (dev.iv_name, node))
3854

    
3855
    # Step: check other node consistency
3856
    self.proc.LogStep(2, steps_total, "check peer consistency")
3857
    for dev in instance.disks:
3858
      if not dev.iv_name in self.op.disks:
3859
        continue
3860
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3861
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3862
                                   oth_node==instance.primary_node):
3863
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3864
                                 " to replace disks on this node (%s)" %
3865
                                 (oth_node, tgt_node))
3866

    
3867
    # Step: create new storage
3868
    self.proc.LogStep(3, steps_total, "allocate new storage")
3869
    for dev in instance.disks:
3870
      if not dev.iv_name in self.op.disks:
3871
        continue
3872
      size = dev.size
3873
      cfg.SetDiskID(dev, tgt_node)
3874
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3875
      names = _GenerateUniqueNames(cfg, lv_names)
3876
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3877
                             logical_id=(vgname, names[0]))
3878
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3879
                             logical_id=(vgname, names[1]))
3880
      new_lvs = [lv_data, lv_meta]
3881
      old_lvs = dev.children
3882
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3883
      info("creating new local storage on %s for %s" %
3884
           (tgt_node, dev.iv_name))
3885
      # since we *always* want to create this LV, we use the
3886
      # _Create...OnPrimary (which forces the creation), even if we
3887
      # are talking about the secondary node
3888
      for new_lv in new_lvs:
3889
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3890
                                        _GetInstanceInfoText(instance)):
3891
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3892
                                   " node '%s'" %
3893
                                   (new_lv.logical_id[1], tgt_node))
3894

    
3895
    # Step: for each lv, detach+rename*2+attach
3896
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3897
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3898
      info("detaching %s drbd from local storage" % dev.iv_name)
3899
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3900
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3901
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3902
      #dev.children = []
3903
      #cfg.Update(instance)
3904

    
3905
      # ok, we created the new LVs, so now we know we have the needed
3906
      # storage; as such, we proceed on the target node to rename
3907
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3908
      # using the assumption that logical_id == physical_id (which in
3909
      # turn is the unique_id on that node)
3910

    
3911
      # FIXME(iustin): use a better name for the replaced LVs
3912
      temp_suffix = int(time.time())
3913
      ren_fn = lambda d, suff: (d.physical_id[0],
3914
                                d.physical_id[1] + "_replaced-%s" % suff)
3915
      # build the rename list based on what LVs exist on the node
3916
      rlist = []
3917
      for to_ren in old_lvs:
3918
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3919
        if find_res is not None: # device exists
3920
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3921

    
3922
      info("renaming the old LVs on the target node")
3923
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3924
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3925
      # now we rename the new LVs to the old LVs
3926
      info("renaming the new LVs on the target node")
3927
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3928
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3929
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3930

    
3931
      for old, new in zip(old_lvs, new_lvs):
3932
        new.logical_id = old.logical_id
3933
        cfg.SetDiskID(new, tgt_node)
3934

    
3935
      for disk in old_lvs:
3936
        disk.logical_id = ren_fn(disk, temp_suffix)
3937
        cfg.SetDiskID(disk, tgt_node)
3938

    
3939
      # now that the new lvs have the old name, we can add them to the device
3940
      info("adding new mirror component on %s" % tgt_node)
3941
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3942
        for new_lv in new_lvs:
3943
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3944
            warning("Can't rollback device %s", hint="manually cleanup unused"
3945
                    " logical volumes")
3946
        raise errors.OpExecError("Can't add local storage to drbd")
3947

    
3948
      dev.children = new_lvs
3949
      cfg.Update(instance)
3950

    
3951
    # Step: wait for sync
3952

    
3953
    # this can fail as the old devices are degraded and _WaitForSync
3954
    # does a combined result over all disks, so we don't check its
3955
    # return value
3956
    self.proc.LogStep(5, steps_total, "sync devices")
3957
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3958

    
3959
    # so check manually all the devices
3960
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3961
      cfg.SetDiskID(dev, instance.primary_node)
3962
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3963
      if is_degr:
3964
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3965

    
3966
    # Step: remove old storage
3967
    self.proc.LogStep(6, steps_total, "removing old storage")
3968
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3969
      info("remove logical volumes for %s" % name)
3970
      for lv in old_lvs:
3971
        cfg.SetDiskID(lv, tgt_node)
3972
        if not rpc.call_blockdev_remove(tgt_node, lv):
3973
          warning("Can't remove old LV", hint="manually remove unused LVs")
3974
          continue
3975

    
3976
  def _ExecD8Secondary(self, feedback_fn):
3977
    """Replace the secondary node for drbd8.
3978

3979
    The algorithm for replace is quite complicated:
3980
      - for all disks of the instance:
3981
        - create new LVs on the new node with same names
3982
        - shutdown the drbd device on the old secondary
3983
        - disconnect the drbd network on the primary