Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 3656b3af

History | View | Annotate | Download (185 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    # Dicts used to declare locking needs to mcpu
84
    self.needed_locks = None
85
    self.acquired_locks = {}
86
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87
    self.add_locks = {}
88
    self.remove_locks = {}
89
    # Used to force good behavior when calling helper functions
90
    self.recalculate_locks = {}
91
    self.__ssh = None
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98

    
99
    if not self.cfg.IsCluster():
100
      raise errors.OpPrereqError("Cluster not initialized yet,"
101
                                 " use 'gnt-cluster init' first.")
102
    if self.REQ_MASTER:
103
      master = sstore.GetMasterNode()
104
      if master != utils.HostInfo().name:
105
        raise errors.OpPrereqError("Commands must be run on the master"
106
                                   " node %s" % master)
107

    
108
  def __GetSSH(self):
109
    """Returns the SshRunner object
110

111
    """
112
    if not self.__ssh:
113
      self.__ssh = ssh.SshRunner(self.sstore)
114
    return self.__ssh
115

    
116
  ssh = property(fget=__GetSSH)
117

    
118
  def ExpandNames(self):
119
    """Expand names for this LU.
120

121
    This method is called before starting to execute the opcode, and it should
122
    update all the parameters of the opcode to their canonical form (e.g. a
123
    short node name must be fully expanded after this method has successfully
124
    completed). This way locking, hooks, logging, ecc. can work correctly.
125

126
    LUs which implement this method must also populate the self.needed_locks
127
    member, as a dict with lock levels as keys, and a list of needed lock names
128
    as values. Rules:
129
      - Use an empty dict if you don't need any lock
130
      - If you don't need any lock at a particular level omit that level
131
      - Don't put anything for the BGL level
132
      - If you want all locks at a level use locking.ALL_SET as a value
133

134
    If you need to share locks (rather than acquire them exclusively) at one
135
    level you can modify self.share_locks, setting a true value (usually 1) for
136
    that level. By default locks are not shared.
137

138
    Examples:
139
    # Acquire all nodes and one instance
140
    self.needed_locks = {
141
      locking.LEVEL_NODE: locking.ALL_SET,
142
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
143
    }
144
    # Acquire just two nodes
145
    self.needed_locks = {
146
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147
    }
148
    # Acquire no locks
149
    self.needed_locks = {} # No, you can't leave it to the default value None
150

151
    """
152
    # The implementation of this method is mandatory only if the new LU is
153
    # concurrent, so that old LUs don't need to be changed all at the same
154
    # time.
155
    if self.REQ_BGL:
156
      self.needed_locks = {} # Exclusive LUs don't need locks.
157
    else:
158
      raise NotImplementedError
159

    
160
  def DeclareLocks(self, level):
161
    """Declare LU locking needs for a level
162

163
    While most LUs can just declare their locking needs at ExpandNames time,
164
    sometimes there's the need to calculate some locks after having acquired
165
    the ones before. This function is called just before acquiring locks at a
166
    particular level, but after acquiring the ones at lower levels, and permits
167
    such calculations. It can be used to modify self.needed_locks, and by
168
    default it does nothing.
169

170
    This function is only called if you have something already set in
171
    self.needed_locks for the level.
172

173
    @param level: Locking level which is going to be locked
174
    @type level: member of ganeti.locking.LEVELS
175

176
    """
177

    
178
  def CheckPrereq(self):
179
    """Check prerequisites for this LU.
180

181
    This method should check that the prerequisites for the execution
182
    of this LU are fulfilled. It can do internode communication, but
183
    it should be idempotent - no cluster or system changes are
184
    allowed.
185

186
    The method should raise errors.OpPrereqError in case something is
187
    not fulfilled. Its return value is ignored.
188

189
    This method should also update all the parameters of the opcode to
190
    their canonical form if it hasn't been done by ExpandNames before.
191

192
    """
193
    raise NotImplementedError
194

    
195
  def Exec(self, feedback_fn):
196
    """Execute the LU.
197

198
    This method should implement the actual work. It should raise
199
    errors.OpExecError for failures that are somewhat dealt with in
200
    code, or expected.
201

202
    """
203
    raise NotImplementedError
204

    
205
  def BuildHooksEnv(self):
206
    """Build hooks environment for this LU.
207

208
    This method should return a three-node tuple consisting of: a dict
209
    containing the environment that will be used for running the
210
    specific hook for this LU, a list of node names on which the hook
211
    should run before the execution, and a list of node names on which
212
    the hook should run after the execution.
213

214
    The keys of the dict must not have 'GANETI_' prefixed as this will
215
    be handled in the hooks runner. Also note additional keys will be
216
    added by the hooks runner. If the LU doesn't define any
217
    environment, an empty dict (and not None) should be returned.
218

219
    No nodes should be returned as an empty list (and not None).
220

221
    Note that if the HPATH for a LU class is None, this function will
222
    not be called.
223

224
    """
225
    raise NotImplementedError
226

    
227
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
228
    """Notify the LU about the results of its hooks.
229

230
    This method is called every time a hooks phase is executed, and notifies
231
    the Logical Unit about the hooks' result. The LU can then use it to alter
232
    its result based on the hooks.  By default the method does nothing and the
233
    previous result is passed back unchanged but any LU can define it if it
234
    wants to use the local cluster hook-scripts somehow.
235

236
    Args:
237
      phase: the hooks phase that has just been run
238
      hooks_results: the results of the multi-node hooks rpc call
239
      feedback_fn: function to send feedback back to the caller
240
      lu_result: the previous result this LU had, or None in the PRE phase.
241

242
    """
243
    return lu_result
244

    
245
  def _ExpandAndLockInstance(self):
246
    """Helper function to expand and lock an instance.
247

248
    Many LUs that work on an instance take its name in self.op.instance_name
249
    and need to expand it and then declare the expanded name for locking. This
250
    function does it, and then updates self.op.instance_name to the expanded
251
    name. It also initializes needed_locks as a dict, if this hasn't been done
252
    before.
253

254
    """
255
    if self.needed_locks is None:
256
      self.needed_locks = {}
257
    else:
258
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
259
        "_ExpandAndLockInstance called with instance-level locks set"
260
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
261
    if expanded_name is None:
262
      raise errors.OpPrereqError("Instance '%s' not known" %
263
                                  self.op.instance_name)
264
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
265
    self.op.instance_name = expanded_name
266

    
267
  def _LockInstancesNodes(self, primary_only=False):
268
    """Helper function to declare instances' nodes for locking.
269

270
    This function should be called after locking one or more instances to lock
271
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
272
    with all primary or secondary nodes for instances already locked and
273
    present in self.needed_locks[locking.LEVEL_INSTANCE].
274

275
    It should be called from DeclareLocks, and for safety only works if
276
    self.recalculate_locks[locking.LEVEL_NODE] is set.
277

278
    In the future it may grow parameters to just lock some instance's nodes, or
279
    to just lock primaries or secondary nodes, if needed.
280

281
    If should be called in DeclareLocks in a way similar to:
282

283
    if level == locking.LEVEL_NODE:
284
      self._LockInstancesNodes()
285

286
    @type primary_only: boolean
287
    @param primary_only: only lock primary nodes of locked instances
288

289
    """
290
    assert locking.LEVEL_NODE in self.recalculate_locks, \
291
      "_LockInstancesNodes helper function called with no nodes to recalculate"
292

    
293
    # TODO: check if we're really been called with the instance locks held
294

    
295
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
296
    # future we might want to have different behaviors depending on the value
297
    # of self.recalculate_locks[locking.LEVEL_NODE]
298
    wanted_nodes = []
299
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
300
      instance = self.context.cfg.GetInstanceInfo(instance_name)
301
      wanted_nodes.append(instance.primary_node)
302
      if not primary_only:
303
        wanted_nodes.extend(instance.secondary_nodes)
304

    
305
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
306
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
307
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
308
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
309

    
310
    del self.recalculate_locks[locking.LEVEL_NODE]
311

    
312

    
313
class NoHooksLU(LogicalUnit):
314
  """Simple LU which runs no hooks.
315

316
  This LU is intended as a parent for other LogicalUnits which will
317
  run no hooks, in order to reduce duplicate code.
318

319
  """
320
  HPATH = None
321
  HTYPE = None
322

    
323

    
324
def _GetWantedNodes(lu, nodes):
325
  """Returns list of checked and expanded node names.
326

327
  Args:
328
    nodes: List of nodes (strings) or None for all
329

330
  """
331
  if not isinstance(nodes, list):
332
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
333

    
334
  if not nodes:
335
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
336
      " non-empty list of nodes whose name is to be expanded.")
337

    
338
  wanted = []
339
  for name in nodes:
340
    node = lu.cfg.ExpandNodeName(name)
341
    if node is None:
342
      raise errors.OpPrereqError("No such node name '%s'" % name)
343
    wanted.append(node)
344

    
345
  return utils.NiceSort(wanted)
346

    
347

    
348
def _GetWantedInstances(lu, instances):
349
  """Returns list of checked and expanded instance names.
350

351
  Args:
352
    instances: List of instances (strings) or None for all
353

354
  """
355
  if not isinstance(instances, list):
356
    raise errors.OpPrereqError("Invalid argument type 'instances'")
357

    
358
  if instances:
359
    wanted = []
360

    
361
    for name in instances:
362
      instance = lu.cfg.ExpandInstanceName(name)
363
      if instance is None:
364
        raise errors.OpPrereqError("No such instance name '%s'" % name)
365
      wanted.append(instance)
366

    
367
  else:
368
    wanted = lu.cfg.GetInstanceList()
369
  return utils.NiceSort(wanted)
370

    
371

    
372
def _CheckOutputFields(static, dynamic, selected):
373
  """Checks whether all selected fields are valid.
374

375
  Args:
376
    static: Static fields
377
    dynamic: Dynamic fields
378

379
  """
380
  static_fields = frozenset(static)
381
  dynamic_fields = frozenset(dynamic)
382

    
383
  all_fields = static_fields | dynamic_fields
384

    
385
  if not all_fields.issuperset(selected):
386
    raise errors.OpPrereqError("Unknown output fields selected: %s"
387
                               % ",".join(frozenset(selected).
388
                                          difference(all_fields)))
389

    
390

    
391
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
392
                          memory, vcpus, nics):
393
  """Builds instance related env variables for hooks from single variables.
394

395
  Args:
396
    secondary_nodes: List of secondary nodes as strings
397
  """
398
  env = {
399
    "OP_TARGET": name,
400
    "INSTANCE_NAME": name,
401
    "INSTANCE_PRIMARY": primary_node,
402
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
403
    "INSTANCE_OS_TYPE": os_type,
404
    "INSTANCE_STATUS": status,
405
    "INSTANCE_MEMORY": memory,
406
    "INSTANCE_VCPUS": vcpus,
407
  }
408

    
409
  if nics:
410
    nic_count = len(nics)
411
    for idx, (ip, bridge, mac) in enumerate(nics):
412
      if ip is None:
413
        ip = ""
414
      env["INSTANCE_NIC%d_IP" % idx] = ip
415
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
416
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
417
  else:
418
    nic_count = 0
419

    
420
  env["INSTANCE_NIC_COUNT"] = nic_count
421

    
422
  return env
423

    
424

    
425
def _BuildInstanceHookEnvByObject(instance, override=None):
426
  """Builds instance related env variables for hooks from an object.
427

428
  Args:
429
    instance: objects.Instance object of instance
430
    override: dict of values to override
431
  """
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': instance.memory,
439
    'vcpus': instance.vcpus,
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.sstore.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.sstore.GetMasterNode()
489
    if not rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    # checks config file checksum
555
    # checks ssh to any
556

    
557
    if 'filelist' not in node_result:
558
      bad = True
559
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
560
    else:
561
      remote_cksum = node_result['filelist']
562
      for file_name in file_list:
563
        if file_name not in remote_cksum:
564
          bad = True
565
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
566
        elif remote_cksum[file_name] != local_cksum[file_name]:
567
          bad = True
568
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
569

    
570
    if 'nodelist' not in node_result:
571
      bad = True
572
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
573
    else:
574
      if node_result['nodelist']:
575
        bad = True
576
        for node in node_result['nodelist']:
577
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
578
                          (node, node_result['nodelist'][node]))
579
    if 'node-net-test' not in node_result:
580
      bad = True
581
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
582
    else:
583
      if node_result['node-net-test']:
584
        bad = True
585
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
586
        for node in nlist:
587
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
588
                          (node, node_result['node-net-test'][node]))
589

    
590
    hyp_result = node_result.get('hypervisor', None)
591
    if hyp_result is not None:
592
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
593
    return bad
594

    
595
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
596
                      node_instance, feedback_fn):
597
    """Verify an instance.
598

599
    This function checks to see if the required block devices are
600
    available on the instance's node.
601

602
    """
603
    bad = False
604

    
605
    node_current = instanceconfig.primary_node
606

    
607
    node_vol_should = {}
608
    instanceconfig.MapLVsByNode(node_vol_should)
609

    
610
    for node in node_vol_should:
611
      for volume in node_vol_should[node]:
612
        if node not in node_vol_is or volume not in node_vol_is[node]:
613
          feedback_fn("  - ERROR: volume %s missing on node %s" %
614
                          (volume, node))
615
          bad = True
616

    
617
    if not instanceconfig.status == 'down':
618
      if (node_current not in node_instance or
619
          not instance in node_instance[node_current]):
620
        feedback_fn("  - ERROR: instance %s not running on node %s" %
621
                        (instance, node_current))
622
        bad = True
623

    
624
    for node in node_instance:
625
      if (not node == node_current):
626
        if instance in node_instance[node]:
627
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
628
                          (instance, node))
629
          bad = True
630

    
631
    return bad
632

    
633
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
634
    """Verify if there are any unknown volumes in the cluster.
635

636
    The .os, .swap and backup volumes are ignored. All other volumes are
637
    reported as unknown.
638

639
    """
640
    bad = False
641

    
642
    for node in node_vol_is:
643
      for volume in node_vol_is[node]:
644
        if node not in node_vol_should or volume not in node_vol_should[node]:
645
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
646
                      (volume, node))
647
          bad = True
648
    return bad
649

    
650
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
651
    """Verify the list of running instances.
652

653
    This checks what instances are running but unknown to the cluster.
654

655
    """
656
    bad = False
657
    for node in node_instance:
658
      for runninginstance in node_instance[node]:
659
        if runninginstance not in instancelist:
660
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
661
                          (runninginstance, node))
662
          bad = True
663
    return bad
664

    
665
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
666
    """Verify N+1 Memory Resilience.
667

668
    Check that if one single node dies we can still start all the instances it
669
    was primary for.
670

671
    """
672
    bad = False
673

    
674
    for node, nodeinfo in node_info.iteritems():
675
      # This code checks that every node which is now listed as secondary has
676
      # enough memory to host all instances it is supposed to should a single
677
      # other node in the cluster fail.
678
      # FIXME: not ready for failover to an arbitrary node
679
      # FIXME: does not support file-backed instances
680
      # WARNING: we currently take into account down instances as well as up
681
      # ones, considering that even if they're down someone might want to start
682
      # them even in the event of a node failure.
683
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
684
        needed_mem = 0
685
        for instance in instances:
686
          needed_mem += instance_cfg[instance].memory
687
        if nodeinfo['mfree'] < needed_mem:
688
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
689
                      " failovers should node %s fail" % (node, prinode))
690
          bad = True
691
    return bad
692

    
693
  def CheckPrereq(self):
694
    """Check prerequisites.
695

696
    Transform the list of checks we're going to skip into a set and check that
697
    all its members are valid.
698

699
    """
700
    self.skip_set = frozenset(self.op.skip_checks)
701
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
702
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
703

    
704
  def BuildHooksEnv(self):
705
    """Build hooks env.
706

707
    Cluster-Verify hooks just rone in the post phase and their failure makes
708
    the output be logged in the verify output and the verification to fail.
709

710
    """
711
    all_nodes = self.cfg.GetNodeList()
712
    # TODO: populate the environment with useful information for verify hooks
713
    env = {}
714
    return env, [], all_nodes
715

    
716
  def Exec(self, feedback_fn):
717
    """Verify integrity of cluster, performing various test on nodes.
718

719
    """
720
    bad = False
721
    feedback_fn("* Verifying global settings")
722
    for msg in self.cfg.VerifyConfig():
723
      feedback_fn("  - ERROR: %s" % msg)
724

    
725
    vg_name = self.cfg.GetVGName()
726
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
727
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
728
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
729
    i_non_redundant = [] # Non redundant instances
730
    node_volume = {}
731
    node_instance = {}
732
    node_info = {}
733
    instance_cfg = {}
734

    
735
    # FIXME: verify OS list
736
    # do local checksums
737
    file_names = list(self.sstore.GetFileList())
738
    file_names.append(constants.SSL_CERT_FILE)
739
    file_names.append(constants.CLUSTER_CONF_FILE)
740
    local_checksums = utils.FingerprintFiles(file_names)
741

    
742
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
743
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
744
    all_instanceinfo = rpc.call_instance_list(nodelist)
745
    all_vglist = rpc.call_vg_list(nodelist)
746
    node_verify_param = {
747
      'filelist': file_names,
748
      'nodelist': nodelist,
749
      'hypervisor': None,
750
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
751
                        for node in nodeinfo]
752
      }
753
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
754
    all_rversion = rpc.call_version(nodelist)
755
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
756

    
757
    for node in nodelist:
758
      feedback_fn("* Verifying node %s" % node)
759
      result = self._VerifyNode(node, file_names, local_checksums,
760
                                all_vglist[node], all_nvinfo[node],
761
                                all_rversion[node], feedback_fn)
762
      bad = bad or result
763

    
764
      # node_volume
765
      volumeinfo = all_volumeinfo[node]
766

    
767
      if isinstance(volumeinfo, basestring):
768
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
769
                    (node, volumeinfo[-400:].encode('string_escape')))
770
        bad = True
771
        node_volume[node] = {}
772
      elif not isinstance(volumeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776
      else:
777
        node_volume[node] = volumeinfo
778

    
779
      # node_instance
780
      nodeinstance = all_instanceinfo[node]
781
      if type(nodeinstance) != list:
782
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
783
        bad = True
784
        continue
785

    
786
      node_instance[node] = nodeinstance
787

    
788
      # node_info
789
      nodeinfo = all_ninfo[node]
790
      if not isinstance(nodeinfo, dict):
791
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
792
        bad = True
793
        continue
794

    
795
      try:
796
        node_info[node] = {
797
          "mfree": int(nodeinfo['memory_free']),
798
          "dfree": int(nodeinfo['vg_free']),
799
          "pinst": [],
800
          "sinst": [],
801
          # dictionary holding all instances this node is secondary for,
802
          # grouped by their primary node. Each key is a cluster node, and each
803
          # value is a list of instances which have the key as primary and the
804
          # current node as secondary.  this is handy to calculate N+1 memory
805
          # availability if you can only failover from a primary to its
806
          # secondary.
807
          "sinst-by-pnode": {},
808
        }
809
      except ValueError:
810
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
811
        bad = True
812
        continue
813

    
814
    node_vol_should = {}
815

    
816
    for instance in instancelist:
817
      feedback_fn("* Verifying instance %s" % instance)
818
      inst_config = self.cfg.GetInstanceInfo(instance)
819
      result =  self._VerifyInstance(instance, inst_config, node_volume,
820
                                     node_instance, feedback_fn)
821
      bad = bad or result
822

    
823
      inst_config.MapLVsByNode(node_vol_should)
824

    
825
      instance_cfg[instance] = inst_config
826

    
827
      pnode = inst_config.primary_node
828
      if pnode in node_info:
829
        node_info[pnode]['pinst'].append(instance)
830
      else:
831
        feedback_fn("  - ERROR: instance %s, connection to primary node"
832
                    " %s failed" % (instance, pnode))
833
        bad = True
834

    
835
      # If the instance is non-redundant we cannot survive losing its primary
836
      # node, so we are not N+1 compliant. On the other hand we have no disk
837
      # templates with more than one secondary so that situation is not well
838
      # supported either.
839
      # FIXME: does not support file-backed instances
840
      if len(inst_config.secondary_nodes) == 0:
841
        i_non_redundant.append(instance)
842
      elif len(inst_config.secondary_nodes) > 1:
843
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
844
                    % instance)
845

    
846
      for snode in inst_config.secondary_nodes:
847
        if snode in node_info:
848
          node_info[snode]['sinst'].append(instance)
849
          if pnode not in node_info[snode]['sinst-by-pnode']:
850
            node_info[snode]['sinst-by-pnode'][pnode] = []
851
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
852
        else:
853
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
854
                      " %s failed" % (instance, snode))
855

    
856
    feedback_fn("* Verifying orphan volumes")
857
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
858
                                       feedback_fn)
859
    bad = bad or result
860

    
861
    feedback_fn("* Verifying remaining instances")
862
    result = self._VerifyOrphanInstances(instancelist, node_instance,
863
                                         feedback_fn)
864
    bad = bad or result
865

    
866
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
867
      feedback_fn("* Verifying N+1 Memory redundancy")
868
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
869
      bad = bad or result
870

    
871
    feedback_fn("* Other Notes")
872
    if i_non_redundant:
873
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
874
                  % len(i_non_redundant))
875

    
876
    return not bad
877

    
878
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
879
    """Analize the post-hooks' result, handle it, and send some
880
    nicely-formatted feedback back to the user.
881

882
    Args:
883
      phase: the hooks phase that has just been run
884
      hooks_results: the results of the multi-node hooks rpc call
885
      feedback_fn: function to send feedback back to the caller
886
      lu_result: previous Exec result
887

888
    """
889
    # We only really run POST phase hooks, and are only interested in
890
    # their results
891
    if phase == constants.HOOKS_PHASE_POST:
892
      # Used to change hooks' output to proper indentation
893
      indent_re = re.compile('^', re.M)
894
      feedback_fn("* Hooks Results")
895
      if not hooks_results:
896
        feedback_fn("  - ERROR: general communication failure")
897
        lu_result = 1
898
      else:
899
        for node_name in hooks_results:
900
          show_node_header = True
901
          res = hooks_results[node_name]
902
          if res is False or not isinstance(res, list):
903
            feedback_fn("    Communication failure")
904
            lu_result = 1
905
            continue
906
          for script, hkr, output in res:
907
            if hkr == constants.HKR_FAIL:
908
              # The node header is only shown once, if there are
909
              # failing hooks on that node
910
              if show_node_header:
911
                feedback_fn("  Node %s:" % node_name)
912
                show_node_header = False
913
              feedback_fn("    ERROR: Script %s failed, output:" % script)
914
              output = indent_re.sub('      ', output)
915
              feedback_fn("%s" % output)
916
              lu_result = 1
917

    
918
      return lu_result
919

    
920

    
921
class LUVerifyDisks(NoHooksLU):
922
  """Verifies the cluster disks status.
923

924
  """
925
  _OP_REQP = []
926
  REQ_BGL = False
927

    
928
  def ExpandNames(self):
929
    self.needed_locks = {
930
      locking.LEVEL_NODE: locking.ALL_SET,
931
      locking.LEVEL_INSTANCE: locking.ALL_SET,
932
    }
933
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
934

    
935
  def CheckPrereq(self):
936
    """Check prerequisites.
937

938
    This has no prerequisites.
939

940
    """
941
    pass
942

    
943
  def Exec(self, feedback_fn):
944
    """Verify integrity of cluster disks.
945

946
    """
947
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
948

    
949
    vg_name = self.cfg.GetVGName()
950
    nodes = utils.NiceSort(self.cfg.GetNodeList())
951
    instances = [self.cfg.GetInstanceInfo(name)
952
                 for name in self.cfg.GetInstanceList()]
953

    
954
    nv_dict = {}
955
    for inst in instances:
956
      inst_lvs = {}
957
      if (inst.status != "up" or
958
          inst.disk_template not in constants.DTS_NET_MIRROR):
959
        continue
960
      inst.MapLVsByNode(inst_lvs)
961
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
962
      for node, vol_list in inst_lvs.iteritems():
963
        for vol in vol_list:
964
          nv_dict[(node, vol)] = inst
965

    
966
    if not nv_dict:
967
      return result
968

    
969
    node_lvs = rpc.call_volume_list(nodes, vg_name)
970

    
971
    to_act = set()
972
    for node in nodes:
973
      # node_volume
974
      lvs = node_lvs[node]
975

    
976
      if isinstance(lvs, basestring):
977
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
978
        res_nlvm[node] = lvs
979
      elif not isinstance(lvs, dict):
980
        logger.Info("connection to node %s failed or invalid data returned" %
981
                    (node,))
982
        res_nodes.append(node)
983
        continue
984

    
985
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
986
        inst = nv_dict.pop((node, lv_name), None)
987
        if (not lv_online and inst is not None
988
            and inst.name not in res_instances):
989
          res_instances.append(inst.name)
990

    
991
    # any leftover items in nv_dict are missing LVs, let's arrange the
992
    # data better
993
    for key, inst in nv_dict.iteritems():
994
      if inst.name not in res_missing:
995
        res_missing[inst.name] = []
996
      res_missing[inst.name].append(key)
997

    
998
    return result
999

    
1000

    
1001
class LURenameCluster(LogicalUnit):
1002
  """Rename the cluster.
1003

1004
  """
1005
  HPATH = "cluster-rename"
1006
  HTYPE = constants.HTYPE_CLUSTER
1007
  _OP_REQP = ["name"]
1008
  REQ_WSSTORE = True
1009

    
1010
  def BuildHooksEnv(self):
1011
    """Build hooks env.
1012

1013
    """
1014
    env = {
1015
      "OP_TARGET": self.sstore.GetClusterName(),
1016
      "NEW_NAME": self.op.name,
1017
      }
1018
    mn = self.sstore.GetMasterNode()
1019
    return env, [mn], [mn]
1020

    
1021
  def CheckPrereq(self):
1022
    """Verify that the passed name is a valid one.
1023

1024
    """
1025
    hostname = utils.HostInfo(self.op.name)
1026

    
1027
    new_name = hostname.name
1028
    self.ip = new_ip = hostname.ip
1029
    old_name = self.sstore.GetClusterName()
1030
    old_ip = self.sstore.GetMasterIP()
1031
    if new_name == old_name and new_ip == old_ip:
1032
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1033
                                 " cluster has changed")
1034
    if new_ip != old_ip:
1035
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1036
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1037
                                   " reachable on the network. Aborting." %
1038
                                   new_ip)
1039

    
1040
    self.op.name = new_name
1041

    
1042
  def Exec(self, feedback_fn):
1043
    """Rename the cluster.
1044

1045
    """
1046
    clustername = self.op.name
1047
    ip = self.ip
1048
    ss = self.sstore
1049

    
1050
    # shutdown the master IP
1051
    master = ss.GetMasterNode()
1052
    if not rpc.call_node_stop_master(master, False):
1053
      raise errors.OpExecError("Could not disable the master role")
1054

    
1055
    try:
1056
      # modify the sstore
1057
      ss.SetKey(ss.SS_MASTER_IP, ip)
1058
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1059

    
1060
      # Distribute updated ss config to all nodes
1061
      myself = self.cfg.GetNodeInfo(master)
1062
      dist_nodes = self.cfg.GetNodeList()
1063
      if myself.name in dist_nodes:
1064
        dist_nodes.remove(myself.name)
1065

    
1066
      logger.Debug("Copying updated ssconf data to all nodes")
1067
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1068
        fname = ss.KeyToFilename(keyname)
1069
        result = rpc.call_upload_file(dist_nodes, fname)
1070
        for to_node in dist_nodes:
1071
          if not result[to_node]:
1072
            logger.Error("copy of file %s to node %s failed" %
1073
                         (fname, to_node))
1074
    finally:
1075
      if not rpc.call_node_start_master(master, False):
1076
        logger.Error("Could not re-enable the master role on the master,"
1077
                     " please restart manually.")
1078

    
1079

    
1080
def _RecursiveCheckIfLVMBased(disk):
1081
  """Check if the given disk or its children are lvm-based.
1082

1083
  Args:
1084
    disk: ganeti.objects.Disk object
1085

1086
  Returns:
1087
    boolean indicating whether a LD_LV dev_type was found or not
1088

1089
  """
1090
  if disk.children:
1091
    for chdisk in disk.children:
1092
      if _RecursiveCheckIfLVMBased(chdisk):
1093
        return True
1094
  return disk.dev_type == constants.LD_LV
1095

    
1096

    
1097
class LUSetClusterParams(LogicalUnit):
1098
  """Change the parameters of the cluster.
1099

1100
  """
1101
  HPATH = "cluster-modify"
1102
  HTYPE = constants.HTYPE_CLUSTER
1103
  _OP_REQP = []
1104

    
1105
  def BuildHooksEnv(self):
1106
    """Build hooks env.
1107

1108
    """
1109
    env = {
1110
      "OP_TARGET": self.sstore.GetClusterName(),
1111
      "NEW_VG_NAME": self.op.vg_name,
1112
      }
1113
    mn = self.sstore.GetMasterNode()
1114
    return env, [mn], [mn]
1115

    
1116
  def CheckPrereq(self):
1117
    """Check prerequisites.
1118

1119
    This checks whether the given params don't conflict and
1120
    if the given volume group is valid.
1121

1122
    """
1123
    if not self.op.vg_name:
1124
      instances = [self.cfg.GetInstanceInfo(name)
1125
                   for name in self.cfg.GetInstanceList()]
1126
      for inst in instances:
1127
        for disk in inst.disks:
1128
          if _RecursiveCheckIfLVMBased(disk):
1129
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1130
                                       " lvm-based instances exist")
1131

    
1132
    # if vg_name not None, checks given volume group on all nodes
1133
    if self.op.vg_name:
1134
      node_list = self.cfg.GetNodeList()
1135
      vglist = rpc.call_vg_list(node_list)
1136
      for node in node_list:
1137
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1138
                                              constants.MIN_VG_SIZE)
1139
        if vgstatus:
1140
          raise errors.OpPrereqError("Error on node '%s': %s" %
1141
                                     (node, vgstatus))
1142

    
1143
  def Exec(self, feedback_fn):
1144
    """Change the parameters of the cluster.
1145

1146
    """
1147
    if self.op.vg_name != self.cfg.GetVGName():
1148
      self.cfg.SetVGName(self.op.vg_name)
1149
    else:
1150
      feedback_fn("Cluster LVM configuration already in desired"
1151
                  " state, not changing")
1152

    
1153

    
1154
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1155
  """Sleep and poll for an instance's disk to sync.
1156

1157
  """
1158
  if not instance.disks:
1159
    return True
1160

    
1161
  if not oneshot:
1162
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1163

    
1164
  node = instance.primary_node
1165

    
1166
  for dev in instance.disks:
1167
    cfgw.SetDiskID(dev, node)
1168

    
1169
  retries = 0
1170
  while True:
1171
    max_time = 0
1172
    done = True
1173
    cumul_degraded = False
1174
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1175
    if not rstats:
1176
      proc.LogWarning("Can't get any data from node %s" % node)
1177
      retries += 1
1178
      if retries >= 10:
1179
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1180
                                 " aborting." % node)
1181
      time.sleep(6)
1182
      continue
1183
    retries = 0
1184
    for i in range(len(rstats)):
1185
      mstat = rstats[i]
1186
      if mstat is None:
1187
        proc.LogWarning("Can't compute data for node %s/%s" %
1188
                        (node, instance.disks[i].iv_name))
1189
        continue
1190
      # we ignore the ldisk parameter
1191
      perc_done, est_time, is_degraded, _ = mstat
1192
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1193
      if perc_done is not None:
1194
        done = False
1195
        if est_time is not None:
1196
          rem_time = "%d estimated seconds remaining" % est_time
1197
          max_time = est_time
1198
        else:
1199
          rem_time = "no time estimate"
1200
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1201
                     (instance.disks[i].iv_name, perc_done, rem_time))
1202
    if done or oneshot:
1203
      break
1204

    
1205
    time.sleep(min(60, max_time))
1206

    
1207
  if done:
1208
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1209
  return not cumul_degraded
1210

    
1211

    
1212
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1213
  """Check that mirrors are not degraded.
1214

1215
  The ldisk parameter, if True, will change the test from the
1216
  is_degraded attribute (which represents overall non-ok status for
1217
  the device(s)) to the ldisk (representing the local storage status).
1218

1219
  """
1220
  cfgw.SetDiskID(dev, node)
1221
  if ldisk:
1222
    idx = 6
1223
  else:
1224
    idx = 5
1225

    
1226
  result = True
1227
  if on_primary or dev.AssembleOnSecondary():
1228
    rstats = rpc.call_blockdev_find(node, dev)
1229
    if not rstats:
1230
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1231
      result = False
1232
    else:
1233
      result = result and (not rstats[idx])
1234
  if dev.children:
1235
    for child in dev.children:
1236
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1237

    
1238
  return result
1239

    
1240

    
1241
class LUDiagnoseOS(NoHooksLU):
1242
  """Logical unit for OS diagnose/query.
1243

1244
  """
1245
  _OP_REQP = ["output_fields", "names"]
1246
  REQ_BGL = False
1247

    
1248
  def ExpandNames(self):
1249
    if self.op.names:
1250
      raise errors.OpPrereqError("Selective OS query not supported")
1251

    
1252
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1253
    _CheckOutputFields(static=[],
1254
                       dynamic=self.dynamic_fields,
1255
                       selected=self.op.output_fields)
1256

    
1257
    # Lock all nodes, in shared mode
1258
    self.needed_locks = {}
1259
    self.share_locks[locking.LEVEL_NODE] = 1
1260
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1261

    
1262
  def CheckPrereq(self):
1263
    """Check prerequisites.
1264

1265
    """
1266

    
1267
  @staticmethod
1268
  def _DiagnoseByOS(node_list, rlist):
1269
    """Remaps a per-node return list into an a per-os per-node dictionary
1270

1271
      Args:
1272
        node_list: a list with the names of all nodes
1273
        rlist: a map with node names as keys and OS objects as values
1274

1275
      Returns:
1276
        map: a map with osnames as keys and as value another map, with
1277
             nodes as
1278
             keys and list of OS objects as values
1279
             e.g. {"debian-etch": {"node1": [<object>,...],
1280
                                   "node2": [<object>,]}
1281
                  }
1282

1283
    """
1284
    all_os = {}
1285
    for node_name, nr in rlist.iteritems():
1286
      if not nr:
1287
        continue
1288
      for os_obj in nr:
1289
        if os_obj.name not in all_os:
1290
          # build a list of nodes for this os containing empty lists
1291
          # for each node in node_list
1292
          all_os[os_obj.name] = {}
1293
          for nname in node_list:
1294
            all_os[os_obj.name][nname] = []
1295
        all_os[os_obj.name][node_name].append(os_obj)
1296
    return all_os
1297

    
1298
  def Exec(self, feedback_fn):
1299
    """Compute the list of OSes.
1300

1301
    """
1302
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1303
    node_data = rpc.call_os_diagnose(node_list)
1304
    if node_data == False:
1305
      raise errors.OpExecError("Can't gather the list of OSes")
1306
    pol = self._DiagnoseByOS(node_list, node_data)
1307
    output = []
1308
    for os_name, os_data in pol.iteritems():
1309
      row = []
1310
      for field in self.op.output_fields:
1311
        if field == "name":
1312
          val = os_name
1313
        elif field == "valid":
1314
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1315
        elif field == "node_status":
1316
          val = {}
1317
          for node_name, nos_list in os_data.iteritems():
1318
            val[node_name] = [(v.status, v.path) for v in nos_list]
1319
        else:
1320
          raise errors.ParameterError(field)
1321
        row.append(val)
1322
      output.append(row)
1323

    
1324
    return output
1325

    
1326

    
1327
class LURemoveNode(LogicalUnit):
1328
  """Logical unit for removing a node.
1329

1330
  """
1331
  HPATH = "node-remove"
1332
  HTYPE = constants.HTYPE_NODE
1333
  _OP_REQP = ["node_name"]
1334

    
1335
  def BuildHooksEnv(self):
1336
    """Build hooks env.
1337

1338
    This doesn't run on the target node in the pre phase as a failed
1339
    node would then be impossible to remove.
1340

1341
    """
1342
    env = {
1343
      "OP_TARGET": self.op.node_name,
1344
      "NODE_NAME": self.op.node_name,
1345
      }
1346
    all_nodes = self.cfg.GetNodeList()
1347
    all_nodes.remove(self.op.node_name)
1348
    return env, all_nodes, all_nodes
1349

    
1350
  def CheckPrereq(self):
1351
    """Check prerequisites.
1352

1353
    This checks:
1354
     - the node exists in the configuration
1355
     - it does not have primary or secondary instances
1356
     - it's not the master
1357

1358
    Any errors are signalled by raising errors.OpPrereqError.
1359

1360
    """
1361
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1362
    if node is None:
1363
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1364

    
1365
    instance_list = self.cfg.GetInstanceList()
1366

    
1367
    masternode = self.sstore.GetMasterNode()
1368
    if node.name == masternode:
1369
      raise errors.OpPrereqError("Node is the master node,"
1370
                                 " you need to failover first.")
1371

    
1372
    for instance_name in instance_list:
1373
      instance = self.cfg.GetInstanceInfo(instance_name)
1374
      if node.name == instance.primary_node:
1375
        raise errors.OpPrereqError("Instance %s still running on the node,"
1376
                                   " please remove first." % instance_name)
1377
      if node.name in instance.secondary_nodes:
1378
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1379
                                   " please remove first." % instance_name)
1380
    self.op.node_name = node.name
1381
    self.node = node
1382

    
1383
  def Exec(self, feedback_fn):
1384
    """Removes the node from the cluster.
1385

1386
    """
1387
    node = self.node
1388
    logger.Info("stopping the node daemon and removing configs from node %s" %
1389
                node.name)
1390

    
1391
    self.context.RemoveNode(node.name)
1392

    
1393
    rpc.call_node_leave_cluster(node.name)
1394

    
1395

    
1396
class LUQueryNodes(NoHooksLU):
1397
  """Logical unit for querying nodes.
1398

1399
  """
1400
  _OP_REQP = ["output_fields", "names"]
1401
  REQ_BGL = False
1402

    
1403
  def ExpandNames(self):
1404
    self.dynamic_fields = frozenset([
1405
      "dtotal", "dfree",
1406
      "mtotal", "mnode", "mfree",
1407
      "bootid",
1408
      "ctotal",
1409
      ])
1410

    
1411
    self.static_fields = frozenset([
1412
      "name", "pinst_cnt", "sinst_cnt",
1413
      "pinst_list", "sinst_list",
1414
      "pip", "sip", "tags",
1415
      ])
1416

    
1417
    _CheckOutputFields(static=self.static_fields,
1418
                       dynamic=self.dynamic_fields,
1419
                       selected=self.op.output_fields)
1420

    
1421
    self.needed_locks = {}
1422
    self.share_locks[locking.LEVEL_NODE] = 1
1423

    
1424
    if self.op.names:
1425
      self.wanted = _GetWantedNodes(self, self.op.names)
1426
    else:
1427
      self.wanted = locking.ALL_SET
1428

    
1429
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1430
    if self.do_locking:
1431
      # if we don't request only static fields, we need to lock the nodes
1432
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1433

    
1434

    
1435
  def CheckPrereq(self):
1436
    """Check prerequisites.
1437

1438
    """
1439
    # The validation of the node list is done in the _GetWantedNodes,
1440
    # if non empty, and if empty, there's no validation to do
1441
    pass
1442

    
1443
  def Exec(self, feedback_fn):
1444
    """Computes the list of nodes and their attributes.
1445

1446
    """
1447
    all_info = self.cfg.GetAllNodesInfo()
1448
    if self.do_locking:
1449
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1450
    else:
1451
      nodenames = all_info.keys()
1452
    nodelist = [all_info[name] for name in nodenames]
1453

    
1454
    # begin data gathering
1455

    
1456
    if self.dynamic_fields.intersection(self.op.output_fields):
1457
      live_data = {}
1458
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1459
      for name in nodenames:
1460
        nodeinfo = node_data.get(name, None)
1461
        if nodeinfo:
1462
          live_data[name] = {
1463
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1464
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1465
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1466
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1467
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1468
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1469
            "bootid": nodeinfo['bootid'],
1470
            }
1471
        else:
1472
          live_data[name] = {}
1473
    else:
1474
      live_data = dict.fromkeys(nodenames, {})
1475

    
1476
    node_to_primary = dict([(name, set()) for name in nodenames])
1477
    node_to_secondary = dict([(name, set()) for name in nodenames])
1478

    
1479
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1480
                             "sinst_cnt", "sinst_list"))
1481
    if inst_fields & frozenset(self.op.output_fields):
1482
      instancelist = self.cfg.GetInstanceList()
1483

    
1484
      for instance_name in instancelist:
1485
        inst = self.cfg.GetInstanceInfo(instance_name)
1486
        if inst.primary_node in node_to_primary:
1487
          node_to_primary[inst.primary_node].add(inst.name)
1488
        for secnode in inst.secondary_nodes:
1489
          if secnode in node_to_secondary:
1490
            node_to_secondary[secnode].add(inst.name)
1491

    
1492
    # end data gathering
1493

    
1494
    output = []
1495
    for node in nodelist:
1496
      node_output = []
1497
      for field in self.op.output_fields:
1498
        if field == "name":
1499
          val = node.name
1500
        elif field == "pinst_list":
1501
          val = list(node_to_primary[node.name])
1502
        elif field == "sinst_list":
1503
          val = list(node_to_secondary[node.name])
1504
        elif field == "pinst_cnt":
1505
          val = len(node_to_primary[node.name])
1506
        elif field == "sinst_cnt":
1507
          val = len(node_to_secondary[node.name])
1508
        elif field == "pip":
1509
          val = node.primary_ip
1510
        elif field == "sip":
1511
          val = node.secondary_ip
1512
        elif field == "tags":
1513
          val = list(node.GetTags())
1514
        elif field in self.dynamic_fields:
1515
          val = live_data[node.name].get(field, None)
1516
        else:
1517
          raise errors.ParameterError(field)
1518
        node_output.append(val)
1519
      output.append(node_output)
1520

    
1521
    return output
1522

    
1523

    
1524
class LUQueryNodeVolumes(NoHooksLU):
1525
  """Logical unit for getting volumes on node(s).
1526

1527
  """
1528
  _OP_REQP = ["nodes", "output_fields"]
1529
  REQ_BGL = False
1530

    
1531
  def ExpandNames(self):
1532
    _CheckOutputFields(static=["node"],
1533
                       dynamic=["phys", "vg", "name", "size", "instance"],
1534
                       selected=self.op.output_fields)
1535

    
1536
    self.needed_locks = {}
1537
    self.share_locks[locking.LEVEL_NODE] = 1
1538
    if not self.op.nodes:
1539
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1540
    else:
1541
      self.needed_locks[locking.LEVEL_NODE] = \
1542
        _GetWantedNodes(self, self.op.nodes)
1543

    
1544
  def CheckPrereq(self):
1545
    """Check prerequisites.
1546

1547
    This checks that the fields required are valid output fields.
1548

1549
    """
1550
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1551

    
1552
  def Exec(self, feedback_fn):
1553
    """Computes the list of nodes and their attributes.
1554

1555
    """
1556
    nodenames = self.nodes
1557
    volumes = rpc.call_node_volumes(nodenames)
1558

    
1559
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1560
             in self.cfg.GetInstanceList()]
1561

    
1562
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1563

    
1564
    output = []
1565
    for node in nodenames:
1566
      if node not in volumes or not volumes[node]:
1567
        continue
1568

    
1569
      node_vols = volumes[node][:]
1570
      node_vols.sort(key=lambda vol: vol['dev'])
1571

    
1572
      for vol in node_vols:
1573
        node_output = []
1574
        for field in self.op.output_fields:
1575
          if field == "node":
1576
            val = node
1577
          elif field == "phys":
1578
            val = vol['dev']
1579
          elif field == "vg":
1580
            val = vol['vg']
1581
          elif field == "name":
1582
            val = vol['name']
1583
          elif field == "size":
1584
            val = int(float(vol['size']))
1585
          elif field == "instance":
1586
            for inst in ilist:
1587
              if node not in lv_by_node[inst]:
1588
                continue
1589
              if vol['name'] in lv_by_node[inst][node]:
1590
                val = inst.name
1591
                break
1592
            else:
1593
              val = '-'
1594
          else:
1595
            raise errors.ParameterError(field)
1596
          node_output.append(str(val))
1597

    
1598
        output.append(node_output)
1599

    
1600
    return output
1601

    
1602

    
1603
class LUAddNode(LogicalUnit):
1604
  """Logical unit for adding node to the cluster.
1605

1606
  """
1607
  HPATH = "node-add"
1608
  HTYPE = constants.HTYPE_NODE
1609
  _OP_REQP = ["node_name"]
1610

    
1611
  def BuildHooksEnv(self):
1612
    """Build hooks env.
1613

1614
    This will run on all nodes before, and on all nodes + the new node after.
1615

1616
    """
1617
    env = {
1618
      "OP_TARGET": self.op.node_name,
1619
      "NODE_NAME": self.op.node_name,
1620
      "NODE_PIP": self.op.primary_ip,
1621
      "NODE_SIP": self.op.secondary_ip,
1622
      }
1623
    nodes_0 = self.cfg.GetNodeList()
1624
    nodes_1 = nodes_0 + [self.op.node_name, ]
1625
    return env, nodes_0, nodes_1
1626

    
1627
  def CheckPrereq(self):
1628
    """Check prerequisites.
1629

1630
    This checks:
1631
     - the new node is not already in the config
1632
     - it is resolvable
1633
     - its parameters (single/dual homed) matches the cluster
1634

1635
    Any errors are signalled by raising errors.OpPrereqError.
1636

1637
    """
1638
    node_name = self.op.node_name
1639
    cfg = self.cfg
1640

    
1641
    dns_data = utils.HostInfo(node_name)
1642

    
1643
    node = dns_data.name
1644
    primary_ip = self.op.primary_ip = dns_data.ip
1645
    secondary_ip = getattr(self.op, "secondary_ip", None)
1646
    if secondary_ip is None:
1647
      secondary_ip = primary_ip
1648
    if not utils.IsValidIP(secondary_ip):
1649
      raise errors.OpPrereqError("Invalid secondary IP given")
1650
    self.op.secondary_ip = secondary_ip
1651

    
1652
    node_list = cfg.GetNodeList()
1653
    if not self.op.readd and node in node_list:
1654
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1655
                                 node)
1656
    elif self.op.readd and node not in node_list:
1657
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1658

    
1659
    for existing_node_name in node_list:
1660
      existing_node = cfg.GetNodeInfo(existing_node_name)
1661

    
1662
      if self.op.readd and node == existing_node_name:
1663
        if (existing_node.primary_ip != primary_ip or
1664
            existing_node.secondary_ip != secondary_ip):
1665
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1666
                                     " address configuration as before")
1667
        continue
1668

    
1669
      if (existing_node.primary_ip == primary_ip or
1670
          existing_node.secondary_ip == primary_ip or
1671
          existing_node.primary_ip == secondary_ip or
1672
          existing_node.secondary_ip == secondary_ip):
1673
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1674
                                   " existing node %s" % existing_node.name)
1675

    
1676
    # check that the type of the node (single versus dual homed) is the
1677
    # same as for the master
1678
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1679
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1680
    newbie_singlehomed = secondary_ip == primary_ip
1681
    if master_singlehomed != newbie_singlehomed:
1682
      if master_singlehomed:
1683
        raise errors.OpPrereqError("The master has no private ip but the"
1684
                                   " new node has one")
1685
      else:
1686
        raise errors.OpPrereqError("The master has a private ip but the"
1687
                                   " new node doesn't have one")
1688

    
1689
    # checks reachablity
1690
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1691
      raise errors.OpPrereqError("Node not reachable by ping")
1692

    
1693
    if not newbie_singlehomed:
1694
      # check reachability from my secondary ip to newbie's secondary ip
1695
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1696
                           source=myself.secondary_ip):
1697
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1698
                                   " based ping to noded port")
1699

    
1700
    self.new_node = objects.Node(name=node,
1701
                                 primary_ip=primary_ip,
1702
                                 secondary_ip=secondary_ip)
1703

    
1704
  def Exec(self, feedback_fn):
1705
    """Adds the new node to the cluster.
1706

1707
    """
1708
    new_node = self.new_node
1709
    node = new_node.name
1710

    
1711
    # check connectivity
1712
    result = rpc.call_version([node])[node]
1713
    if result:
1714
      if constants.PROTOCOL_VERSION == result:
1715
        logger.Info("communication to node %s fine, sw version %s match" %
1716
                    (node, result))
1717
      else:
1718
        raise errors.OpExecError("Version mismatch master version %s,"
1719
                                 " node version %s" %
1720
                                 (constants.PROTOCOL_VERSION, result))
1721
    else:
1722
      raise errors.OpExecError("Cannot get version from the new node")
1723

    
1724
    # setup ssh on node
1725
    logger.Info("copy ssh key to node %s" % node)
1726
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1727
    keyarray = []
1728
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1729
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1730
                priv_key, pub_key]
1731

    
1732
    for i in keyfiles:
1733
      f = open(i, 'r')
1734
      try:
1735
        keyarray.append(f.read())
1736
      finally:
1737
        f.close()
1738

    
1739
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1740
                               keyarray[3], keyarray[4], keyarray[5])
1741

    
1742
    if not result:
1743
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1744

    
1745
    # Add node to our /etc/hosts, and add key to known_hosts
1746
    utils.AddHostToEtcHosts(new_node.name)
1747

    
1748
    if new_node.secondary_ip != new_node.primary_ip:
1749
      if not rpc.call_node_tcp_ping(new_node.name,
1750
                                    constants.LOCALHOST_IP_ADDRESS,
1751
                                    new_node.secondary_ip,
1752
                                    constants.DEFAULT_NODED_PORT,
1753
                                    10, False):
1754
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1755
                                 " you gave (%s). Please fix and re-run this"
1756
                                 " command." % new_node.secondary_ip)
1757

    
1758
    node_verify_list = [self.sstore.GetMasterNode()]
1759
    node_verify_param = {
1760
      'nodelist': [node],
1761
      # TODO: do a node-net-test as well?
1762
    }
1763

    
1764
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1765
    for verifier in node_verify_list:
1766
      if not result[verifier]:
1767
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1768
                                 " for remote verification" % verifier)
1769
      if result[verifier]['nodelist']:
1770
        for failed in result[verifier]['nodelist']:
1771
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1772
                      (verifier, result[verifier]['nodelist'][failed]))
1773
        raise errors.OpExecError("ssh/hostname verification failed.")
1774

    
1775
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1776
    # including the node just added
1777
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1778
    dist_nodes = self.cfg.GetNodeList()
1779
    if not self.op.readd:
1780
      dist_nodes.append(node)
1781
    if myself.name in dist_nodes:
1782
      dist_nodes.remove(myself.name)
1783

    
1784
    logger.Debug("Copying hosts and known_hosts to all nodes")
1785
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1786
      result = rpc.call_upload_file(dist_nodes, fname)
1787
      for to_node in dist_nodes:
1788
        if not result[to_node]:
1789
          logger.Error("copy of file %s to node %s failed" %
1790
                       (fname, to_node))
1791

    
1792
    to_copy = self.sstore.GetFileList()
1793
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1794
      to_copy.append(constants.VNC_PASSWORD_FILE)
1795
    for fname in to_copy:
1796
      result = rpc.call_upload_file([node], fname)
1797
      if not result[node]:
1798
        logger.Error("could not copy file %s to node %s" % (fname, node))
1799

    
1800
    if self.op.readd:
1801
      self.context.ReaddNode(new_node)
1802
    else:
1803
      self.context.AddNode(new_node)
1804

    
1805

    
1806
class LUQueryClusterInfo(NoHooksLU):
1807
  """Query cluster configuration.
1808

1809
  """
1810
  _OP_REQP = []
1811
  REQ_MASTER = False
1812
  REQ_BGL = False
1813

    
1814
  def ExpandNames(self):
1815
    self.needed_locks = {}
1816

    
1817
  def CheckPrereq(self):
1818
    """No prerequsites needed for this LU.
1819

1820
    """
1821
    pass
1822

    
1823
  def Exec(self, feedback_fn):
1824
    """Return cluster config.
1825

1826
    """
1827
    result = {
1828
      "name": self.sstore.GetClusterName(),
1829
      "software_version": constants.RELEASE_VERSION,
1830
      "protocol_version": constants.PROTOCOL_VERSION,
1831
      "config_version": constants.CONFIG_VERSION,
1832
      "os_api_version": constants.OS_API_VERSION,
1833
      "export_version": constants.EXPORT_VERSION,
1834
      "master": self.sstore.GetMasterNode(),
1835
      "architecture": (platform.architecture()[0], platform.machine()),
1836
      "hypervisor_type": self.sstore.GetHypervisorType(),
1837
      }
1838

    
1839
    return result
1840

    
1841

    
1842
class LUDumpClusterConfig(NoHooksLU):
1843
  """Return a text-representation of the cluster-config.
1844

1845
  """
1846
  _OP_REQP = []
1847
  REQ_BGL = False
1848

    
1849
  def ExpandNames(self):
1850
    self.needed_locks = {}
1851

    
1852
  def CheckPrereq(self):
1853
    """No prerequisites.
1854

1855
    """
1856
    pass
1857

    
1858
  def Exec(self, feedback_fn):
1859
    """Dump a representation of the cluster config to the standard output.
1860

1861
    """
1862
    return self.cfg.DumpConfig()
1863

    
1864

    
1865
class LUActivateInstanceDisks(NoHooksLU):
1866
  """Bring up an instance's disks.
1867

1868
  """
1869
  _OP_REQP = ["instance_name"]
1870
  REQ_BGL = False
1871

    
1872
  def ExpandNames(self):
1873
    self._ExpandAndLockInstance()
1874
    self.needed_locks[locking.LEVEL_NODE] = []
1875
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1876

    
1877
  def DeclareLocks(self, level):
1878
    if level == locking.LEVEL_NODE:
1879
      self._LockInstancesNodes()
1880

    
1881
  def CheckPrereq(self):
1882
    """Check prerequisites.
1883

1884
    This checks that the instance is in the cluster.
1885

1886
    """
1887
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1888
    assert self.instance is not None, \
1889
      "Cannot retrieve locked instance %s" % self.op.instance_name
1890

    
1891
  def Exec(self, feedback_fn):
1892
    """Activate the disks.
1893

1894
    """
1895
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1896
    if not disks_ok:
1897
      raise errors.OpExecError("Cannot activate block devices")
1898

    
1899
    return disks_info
1900

    
1901

    
1902
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1903
  """Prepare the block devices for an instance.
1904

1905
  This sets up the block devices on all nodes.
1906

1907
  Args:
1908
    instance: a ganeti.objects.Instance object
1909
    ignore_secondaries: if true, errors on secondary nodes won't result
1910
                        in an error return from the function
1911

1912
  Returns:
1913
    false if the operation failed
1914
    list of (host, instance_visible_name, node_visible_name) if the operation
1915
         suceeded with the mapping from node devices to instance devices
1916
  """
1917
  device_info = []
1918
  disks_ok = True
1919
  iname = instance.name
1920
  # With the two passes mechanism we try to reduce the window of
1921
  # opportunity for the race condition of switching DRBD to primary
1922
  # before handshaking occured, but we do not eliminate it
1923

    
1924
  # The proper fix would be to wait (with some limits) until the
1925
  # connection has been made and drbd transitions from WFConnection
1926
  # into any other network-connected state (Connected, SyncTarget,
1927
  # SyncSource, etc.)
1928

    
1929
  # 1st pass, assemble on all nodes in secondary mode
1930
  for inst_disk in instance.disks:
1931
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1932
      cfg.SetDiskID(node_disk, node)
1933
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1934
      if not result:
1935
        logger.Error("could not prepare block device %s on node %s"
1936
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1937
        if not ignore_secondaries:
1938
          disks_ok = False
1939

    
1940
  # FIXME: race condition on drbd migration to primary
1941

    
1942
  # 2nd pass, do only the primary node
1943
  for inst_disk in instance.disks:
1944
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1945
      if node != instance.primary_node:
1946
        continue
1947
      cfg.SetDiskID(node_disk, node)
1948
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1949
      if not result:
1950
        logger.Error("could not prepare block device %s on node %s"
1951
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1952
        disks_ok = False
1953
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1954

    
1955
  # leave the disks configured for the primary node
1956
  # this is a workaround that would be fixed better by
1957
  # improving the logical/physical id handling
1958
  for disk in instance.disks:
1959
    cfg.SetDiskID(disk, instance.primary_node)
1960

    
1961
  return disks_ok, device_info
1962

    
1963

    
1964
def _StartInstanceDisks(cfg, instance, force):
1965
  """Start the disks of an instance.
1966

1967
  """
1968
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1969
                                           ignore_secondaries=force)
1970
  if not disks_ok:
1971
    _ShutdownInstanceDisks(instance, cfg)
1972
    if force is not None and not force:
1973
      logger.Error("If the message above refers to a secondary node,"
1974
                   " you can retry the operation using '--force'.")
1975
    raise errors.OpExecError("Disk consistency error")
1976

    
1977

    
1978
class LUDeactivateInstanceDisks(NoHooksLU):
1979
  """Shutdown an instance's disks.
1980

1981
  """
1982
  _OP_REQP = ["instance_name"]
1983
  REQ_BGL = False
1984

    
1985
  def ExpandNames(self):
1986
    self._ExpandAndLockInstance()
1987
    self.needed_locks[locking.LEVEL_NODE] = []
1988
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1989

    
1990
  def DeclareLocks(self, level):
1991
    if level == locking.LEVEL_NODE:
1992
      self._LockInstancesNodes()
1993

    
1994
  def CheckPrereq(self):
1995
    """Check prerequisites.
1996

1997
    This checks that the instance is in the cluster.
1998

1999
    """
2000
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2001
    assert self.instance is not None, \
2002
      "Cannot retrieve locked instance %s" % self.op.instance_name
2003

    
2004
  def Exec(self, feedback_fn):
2005
    """Deactivate the disks
2006

2007
    """
2008
    instance = self.instance
2009
    _SafeShutdownInstanceDisks(instance, self.cfg)
2010

    
2011

    
2012
def _SafeShutdownInstanceDisks(instance, cfg):
2013
  """Shutdown block devices of an instance.
2014

2015
  This function checks if an instance is running, before calling
2016
  _ShutdownInstanceDisks.
2017

2018
  """
2019
  ins_l = rpc.call_instance_list([instance.primary_node])
2020
  ins_l = ins_l[instance.primary_node]
2021
  if not type(ins_l) is list:
2022
    raise errors.OpExecError("Can't contact node '%s'" %
2023
                             instance.primary_node)
2024

    
2025
  if instance.name in ins_l:
2026
    raise errors.OpExecError("Instance is running, can't shutdown"
2027
                             " block devices.")
2028

    
2029
  _ShutdownInstanceDisks(instance, cfg)
2030

    
2031

    
2032
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2033
  """Shutdown block devices of an instance.
2034

2035
  This does the shutdown on all nodes of the instance.
2036

2037
  If the ignore_primary is false, errors on the primary node are
2038
  ignored.
2039

2040
  """
2041
  result = True
2042
  for disk in instance.disks:
2043
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2044
      cfg.SetDiskID(top_disk, node)
2045
      if not rpc.call_blockdev_shutdown(node, top_disk):
2046
        logger.Error("could not shutdown block device %s on node %s" %
2047
                     (disk.iv_name, node))
2048
        if not ignore_primary or node != instance.primary_node:
2049
          result = False
2050
  return result
2051

    
2052

    
2053
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2054
  """Checks if a node has enough free memory.
2055

2056
  This function check if a given node has the needed amount of free
2057
  memory. In case the node has less memory or we cannot get the
2058
  information from the node, this function raise an OpPrereqError
2059
  exception.
2060

2061
  Args:
2062
    - cfg: a ConfigWriter instance
2063
    - node: the node name
2064
    - reason: string to use in the error message
2065
    - requested: the amount of memory in MiB
2066

2067
  """
2068
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2069
  if not nodeinfo or not isinstance(nodeinfo, dict):
2070
    raise errors.OpPrereqError("Could not contact node %s for resource"
2071
                             " information" % (node,))
2072

    
2073
  free_mem = nodeinfo[node].get('memory_free')
2074
  if not isinstance(free_mem, int):
2075
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2076
                             " was '%s'" % (node, free_mem))
2077
  if requested > free_mem:
2078
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2079
                             " needed %s MiB, available %s MiB" %
2080
                             (node, reason, requested, free_mem))
2081

    
2082

    
2083
class LUStartupInstance(LogicalUnit):
2084
  """Starts an instance.
2085

2086
  """
2087
  HPATH = "instance-start"
2088
  HTYPE = constants.HTYPE_INSTANCE
2089
  _OP_REQP = ["instance_name", "force"]
2090
  REQ_BGL = False
2091

    
2092
  def ExpandNames(self):
2093
    self._ExpandAndLockInstance()
2094
    self.needed_locks[locking.LEVEL_NODE] = []
2095
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2096

    
2097
  def DeclareLocks(self, level):
2098
    if level == locking.LEVEL_NODE:
2099
      self._LockInstancesNodes()
2100

    
2101
  def BuildHooksEnv(self):
2102
    """Build hooks env.
2103

2104
    This runs on master, primary and secondary nodes of the instance.
2105

2106
    """
2107
    env = {
2108
      "FORCE": self.op.force,
2109
      }
2110
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2111
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2112
          list(self.instance.secondary_nodes))
2113
    return env, nl, nl
2114

    
2115
  def CheckPrereq(self):
2116
    """Check prerequisites.
2117

2118
    This checks that the instance is in the cluster.
2119

2120
    """
2121
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2122
    assert self.instance is not None, \
2123
      "Cannot retrieve locked instance %s" % self.op.instance_name
2124

    
2125
    # check bridges existance
2126
    _CheckInstanceBridgesExist(instance)
2127

    
2128
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2129
                         "starting instance %s" % instance.name,
2130
                         instance.memory)
2131

    
2132
  def Exec(self, feedback_fn):
2133
    """Start the instance.
2134

2135
    """
2136
    instance = self.instance
2137
    force = self.op.force
2138
    extra_args = getattr(self.op, "extra_args", "")
2139

    
2140
    self.cfg.MarkInstanceUp(instance.name)
2141

    
2142
    node_current = instance.primary_node
2143

    
2144
    _StartInstanceDisks(self.cfg, instance, force)
2145

    
2146
    if not rpc.call_instance_start(node_current, instance, extra_args):
2147
      _ShutdownInstanceDisks(instance, self.cfg)
2148
      raise errors.OpExecError("Could not start instance")
2149

    
2150

    
2151
class LURebootInstance(LogicalUnit):
2152
  """Reboot an instance.
2153

2154
  """
2155
  HPATH = "instance-reboot"
2156
  HTYPE = constants.HTYPE_INSTANCE
2157
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2158
  REQ_BGL = False
2159

    
2160
  def ExpandNames(self):
2161
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2162
                                   constants.INSTANCE_REBOOT_HARD,
2163
                                   constants.INSTANCE_REBOOT_FULL]:
2164
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2165
                                  (constants.INSTANCE_REBOOT_SOFT,
2166
                                   constants.INSTANCE_REBOOT_HARD,
2167
                                   constants.INSTANCE_REBOOT_FULL))
2168
    self._ExpandAndLockInstance()
2169
    self.needed_locks[locking.LEVEL_NODE] = []
2170
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2171

    
2172
  def DeclareLocks(self, level):
2173
    if level == locking.LEVEL_NODE:
2174
      primary_only = not constants.INSTANCE_REBOOT_FULL
2175
      self._LockInstancesNodes(primary_only=primary_only)
2176

    
2177
  def BuildHooksEnv(self):
2178
    """Build hooks env.
2179

2180
    This runs on master, primary and secondary nodes of the instance.
2181

2182
    """
2183
    env = {
2184
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2185
      }
2186
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2187
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2188
          list(self.instance.secondary_nodes))
2189
    return env, nl, nl
2190

    
2191
  def CheckPrereq(self):
2192
    """Check prerequisites.
2193

2194
    This checks that the instance is in the cluster.
2195

2196
    """
2197
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2198
    assert self.instance is not None, \
2199
      "Cannot retrieve locked instance %s" % self.op.instance_name
2200

    
2201
    # check bridges existance
2202
    _CheckInstanceBridgesExist(instance)
2203

    
2204
  def Exec(self, feedback_fn):
2205
    """Reboot the instance.
2206

2207
    """
2208
    instance = self.instance
2209
    ignore_secondaries = self.op.ignore_secondaries
2210
    reboot_type = self.op.reboot_type
2211
    extra_args = getattr(self.op, "extra_args", "")
2212

    
2213
    node_current = instance.primary_node
2214

    
2215
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2216
                       constants.INSTANCE_REBOOT_HARD]:
2217
      if not rpc.call_instance_reboot(node_current, instance,
2218
                                      reboot_type, extra_args):
2219
        raise errors.OpExecError("Could not reboot instance")
2220
    else:
2221
      if not rpc.call_instance_shutdown(node_current, instance):
2222
        raise errors.OpExecError("could not shutdown instance for full reboot")
2223
      _ShutdownInstanceDisks(instance, self.cfg)
2224
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2225
      if not rpc.call_instance_start(node_current, instance, extra_args):
2226
        _ShutdownInstanceDisks(instance, self.cfg)
2227
        raise errors.OpExecError("Could not start instance for full reboot")
2228

    
2229
    self.cfg.MarkInstanceUp(instance.name)
2230

    
2231

    
2232
class LUShutdownInstance(LogicalUnit):
2233
  """Shutdown an instance.
2234

2235
  """
2236
  HPATH = "instance-stop"
2237
  HTYPE = constants.HTYPE_INSTANCE
2238
  _OP_REQP = ["instance_name"]
2239
  REQ_BGL = False
2240

    
2241
  def ExpandNames(self):
2242
    self._ExpandAndLockInstance()
2243
    self.needed_locks[locking.LEVEL_NODE] = []
2244
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2245

    
2246
  def DeclareLocks(self, level):
2247
    if level == locking.LEVEL_NODE:
2248
      self._LockInstancesNodes()
2249

    
2250
  def BuildHooksEnv(self):
2251
    """Build hooks env.
2252

2253
    This runs on master, primary and secondary nodes of the instance.
2254

2255
    """
2256
    env = _BuildInstanceHookEnvByObject(self.instance)
2257
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2258
          list(self.instance.secondary_nodes))
2259
    return env, nl, nl
2260

    
2261
  def CheckPrereq(self):
2262
    """Check prerequisites.
2263

2264
    This checks that the instance is in the cluster.
2265

2266
    """
2267
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2268
    assert self.instance is not None, \
2269
      "Cannot retrieve locked instance %s" % self.op.instance_name
2270

    
2271
  def Exec(self, feedback_fn):
2272
    """Shutdown the instance.
2273

2274
    """
2275
    instance = self.instance
2276
    node_current = instance.primary_node
2277
    self.cfg.MarkInstanceDown(instance.name)
2278
    if not rpc.call_instance_shutdown(node_current, instance):
2279
      logger.Error("could not shutdown instance")
2280

    
2281
    _ShutdownInstanceDisks(instance, self.cfg)
2282

    
2283

    
2284
class LUReinstallInstance(LogicalUnit):
2285
  """Reinstall an instance.
2286

2287
  """
2288
  HPATH = "instance-reinstall"
2289
  HTYPE = constants.HTYPE_INSTANCE
2290
  _OP_REQP = ["instance_name"]
2291
  REQ_BGL = False
2292

    
2293
  def ExpandNames(self):
2294
    self._ExpandAndLockInstance()
2295
    self.needed_locks[locking.LEVEL_NODE] = []
2296
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2297

    
2298
  def DeclareLocks(self, level):
2299
    if level == locking.LEVEL_NODE:
2300
      self._LockInstancesNodes()
2301

    
2302
  def BuildHooksEnv(self):
2303
    """Build hooks env.
2304

2305
    This runs on master, primary and secondary nodes of the instance.
2306

2307
    """
2308
    env = _BuildInstanceHookEnvByObject(self.instance)
2309
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2310
          list(self.instance.secondary_nodes))
2311
    return env, nl, nl
2312

    
2313
  def CheckPrereq(self):
2314
    """Check prerequisites.
2315

2316
    This checks that the instance is in the cluster and is not running.
2317

2318
    """
2319
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2320
    assert instance is not None, \
2321
      "Cannot retrieve locked instance %s" % self.op.instance_name
2322

    
2323
    if instance.disk_template == constants.DT_DISKLESS:
2324
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2325
                                 self.op.instance_name)
2326
    if instance.status != "down":
2327
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2328
                                 self.op.instance_name)
2329
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2330
    if remote_info:
2331
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2332
                                 (self.op.instance_name,
2333
                                  instance.primary_node))
2334

    
2335
    self.op.os_type = getattr(self.op, "os_type", None)
2336
    if self.op.os_type is not None:
2337
      # OS verification
2338
      pnode = self.cfg.GetNodeInfo(
2339
        self.cfg.ExpandNodeName(instance.primary_node))
2340
      if pnode is None:
2341
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2342
                                   self.op.pnode)
2343
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2344
      if not os_obj:
2345
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2346
                                   " primary node"  % self.op.os_type)
2347

    
2348
    self.instance = instance
2349

    
2350
  def Exec(self, feedback_fn):
2351
    """Reinstall the instance.
2352

2353
    """
2354
    inst = self.instance
2355

    
2356
    if self.op.os_type is not None:
2357
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2358
      inst.os = self.op.os_type
2359
      self.cfg.AddInstance(inst)
2360

    
2361
    _StartInstanceDisks(self.cfg, inst, None)
2362
    try:
2363
      feedback_fn("Running the instance OS create scripts...")
2364
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2365
        raise errors.OpExecError("Could not install OS for instance %s"
2366
                                 " on node %s" %
2367
                                 (inst.name, inst.primary_node))
2368
    finally:
2369
      _ShutdownInstanceDisks(inst, self.cfg)
2370

    
2371

    
2372
class LURenameInstance(LogicalUnit):
2373
  """Rename an instance.
2374

2375
  """
2376
  HPATH = "instance-rename"
2377
  HTYPE = constants.HTYPE_INSTANCE
2378
  _OP_REQP = ["instance_name", "new_name"]
2379

    
2380
  def BuildHooksEnv(self):
2381
    """Build hooks env.
2382

2383
    This runs on master, primary and secondary nodes of the instance.
2384

2385
    """
2386
    env = _BuildInstanceHookEnvByObject(self.instance)
2387
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2388
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2389
          list(self.instance.secondary_nodes))
2390
    return env, nl, nl
2391

    
2392
  def CheckPrereq(self):
2393
    """Check prerequisites.
2394

2395
    This checks that the instance is in the cluster and is not running.
2396

2397
    """
2398
    instance = self.cfg.GetInstanceInfo(
2399
      self.cfg.ExpandInstanceName(self.op.instance_name))
2400
    if instance is None:
2401
      raise errors.OpPrereqError("Instance '%s' not known" %
2402
                                 self.op.instance_name)
2403
    if instance.status != "down":
2404
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2405
                                 self.op.instance_name)
2406
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2407
    if remote_info:
2408
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2409
                                 (self.op.instance_name,
2410
                                  instance.primary_node))
2411
    self.instance = instance
2412

    
2413
    # new name verification
2414
    name_info = utils.HostInfo(self.op.new_name)
2415

    
2416
    self.op.new_name = new_name = name_info.name
2417
    instance_list = self.cfg.GetInstanceList()
2418
    if new_name in instance_list:
2419
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2420
                                 new_name)
2421

    
2422
    if not getattr(self.op, "ignore_ip", False):
2423
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2424
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2425
                                   (name_info.ip, new_name))
2426

    
2427

    
2428
  def Exec(self, feedback_fn):
2429
    """Reinstall the instance.
2430

2431
    """
2432
    inst = self.instance
2433
    old_name = inst.name
2434

    
2435
    if inst.disk_template == constants.DT_FILE:
2436
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2437

    
2438
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2439
    # Change the instance lock. This is definitely safe while we hold the BGL
2440
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2441
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2442

    
2443
    # re-read the instance from the configuration after rename
2444
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2445

    
2446
    if inst.disk_template == constants.DT_FILE:
2447
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2448
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2449
                                                old_file_storage_dir,
2450
                                                new_file_storage_dir)
2451

    
2452
      if not result:
2453
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2454
                                 " directory '%s' to '%s' (but the instance"
2455
                                 " has been renamed in Ganeti)" % (
2456
                                 inst.primary_node, old_file_storage_dir,
2457
                                 new_file_storage_dir))
2458

    
2459
      if not result[0]:
2460
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2461
                                 " (but the instance has been renamed in"
2462
                                 " Ganeti)" % (old_file_storage_dir,
2463
                                               new_file_storage_dir))
2464

    
2465
    _StartInstanceDisks(self.cfg, inst, None)
2466
    try:
2467
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2468
                                          "sda", "sdb"):
2469
        msg = ("Could not run OS rename script for instance %s on node %s"
2470
               " (but the instance has been renamed in Ganeti)" %
2471
               (inst.name, inst.primary_node))
2472
        logger.Error(msg)
2473
    finally:
2474
      _ShutdownInstanceDisks(inst, self.cfg)
2475

    
2476

    
2477
class LURemoveInstance(LogicalUnit):
2478
  """Remove an instance.
2479

2480
  """
2481
  HPATH = "instance-remove"
2482
  HTYPE = constants.HTYPE_INSTANCE
2483
  _OP_REQP = ["instance_name", "ignore_failures"]
2484
  REQ_BGL = False
2485

    
2486
  def ExpandNames(self):
2487
    self._ExpandAndLockInstance()
2488
    self.needed_locks[locking.LEVEL_NODE] = []
2489
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2490

    
2491
  def DeclareLocks(self, level):
2492
    if level == locking.LEVEL_NODE:
2493
      self._LockInstancesNodes()
2494

    
2495
  def BuildHooksEnv(self):
2496
    """Build hooks env.
2497

2498
    This runs on master, primary and secondary nodes of the instance.
2499

2500
    """
2501
    env = _BuildInstanceHookEnvByObject(self.instance)
2502
    nl = [self.sstore.GetMasterNode()]
2503
    return env, nl, nl
2504

    
2505
  def CheckPrereq(self):
2506
    """Check prerequisites.
2507

2508
    This checks that the instance is in the cluster.
2509

2510
    """
2511
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2512
    assert self.instance is not None, \
2513
      "Cannot retrieve locked instance %s" % self.op.instance_name
2514

    
2515
  def Exec(self, feedback_fn):
2516
    """Remove the instance.
2517

2518
    """
2519
    instance = self.instance
2520
    logger.Info("shutting down instance %s on node %s" %
2521
                (instance.name, instance.primary_node))
2522

    
2523
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2524
      if self.op.ignore_failures:
2525
        feedback_fn("Warning: can't shutdown instance")
2526
      else:
2527
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2528
                                 (instance.name, instance.primary_node))
2529

    
2530
    logger.Info("removing block devices for instance %s" % instance.name)
2531

    
2532
    if not _RemoveDisks(instance, self.cfg):
2533
      if self.op.ignore_failures:
2534
        feedback_fn("Warning: can't remove instance's disks")
2535
      else:
2536
        raise errors.OpExecError("Can't remove instance's disks")
2537

    
2538
    logger.Info("removing instance %s out of cluster config" % instance.name)
2539

    
2540
    self.cfg.RemoveInstance(instance.name)
2541
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2542

    
2543

    
2544
class LUQueryInstances(NoHooksLU):
2545
  """Logical unit for querying instances.
2546

2547
  """
2548
  _OP_REQP = ["output_fields", "names"]
2549
  REQ_BGL = False
2550

    
2551
  def ExpandNames(self):
2552
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2553
    self.static_fields = frozenset([
2554
      "name", "os", "pnode", "snodes",
2555
      "admin_state", "admin_ram",
2556
      "disk_template", "ip", "mac", "bridge",
2557
      "sda_size", "sdb_size", "vcpus", "tags",
2558
      "auto_balance",
2559
      "network_port", "kernel_path", "initrd_path",
2560
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2561
      "hvm_cdrom_image_path", "hvm_nic_type",
2562
      "hvm_disk_type", "vnc_bind_address",
2563
      ])
2564
    _CheckOutputFields(static=self.static_fields,
2565
                       dynamic=self.dynamic_fields,
2566
                       selected=self.op.output_fields)
2567

    
2568
    self.needed_locks = {}
2569
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2570
    self.share_locks[locking.LEVEL_NODE] = 1
2571

    
2572
    if self.op.names:
2573
      self.wanted = _GetWantedInstances(self, self.op.names)
2574
    else:
2575
      self.wanted = locking.ALL_SET
2576

    
2577
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2578
    if self.do_locking:
2579
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2580
      self.needed_locks[locking.LEVEL_NODE] = []
2581
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2582

    
2583
  def DeclareLocks(self, level):
2584
    if level == locking.LEVEL_NODE and self.do_locking:
2585
      self._LockInstancesNodes()
2586

    
2587
  def CheckPrereq(self):
2588
    """Check prerequisites.
2589

2590
    """
2591
    pass
2592

    
2593
  def Exec(self, feedback_fn):
2594
    """Computes the list of nodes and their attributes.
2595

2596
    """
2597
    all_info = self.cfg.GetAllInstancesInfo()
2598
    if self.do_locking:
2599
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2600
    else:
2601
      instance_names = all_info.keys()
2602
    instance_list = [all_info[iname] for iname in instance_names]
2603

    
2604
    # begin data gathering
2605

    
2606
    nodes = frozenset([inst.primary_node for inst in instance_list])
2607

    
2608
    bad_nodes = []
2609
    if self.dynamic_fields.intersection(self.op.output_fields):
2610
      live_data = {}
2611
      node_data = rpc.call_all_instances_info(nodes)
2612
      for name in nodes:
2613
        result = node_data[name]
2614
        if result:
2615
          live_data.update(result)
2616
        elif result == False:
2617
          bad_nodes.append(name)
2618
        # else no instance is alive
2619
    else:
2620
      live_data = dict([(name, {}) for name in instance_names])
2621

    
2622
    # end data gathering
2623

    
2624
    output = []
2625
    for instance in instance_list:
2626
      iout = []
2627
      for field in self.op.output_fields:
2628
        if field == "name":
2629
          val = instance.name
2630
        elif field == "os":
2631
          val = instance.os
2632
        elif field == "pnode":
2633
          val = instance.primary_node
2634
        elif field == "snodes":
2635
          val = list(instance.secondary_nodes)
2636
        elif field == "admin_state":
2637
          val = (instance.status != "down")
2638
        elif field == "oper_state":
2639
          if instance.primary_node in bad_nodes:
2640
            val = None
2641
          else:
2642
            val = bool(live_data.get(instance.name))
2643
        elif field == "status":
2644
          if instance.primary_node in bad_nodes:
2645
            val = "ERROR_nodedown"
2646
          else:
2647
            running = bool(live_data.get(instance.name))
2648
            if running:
2649
              if instance.status != "down":
2650
                val = "running"
2651
              else:
2652
                val = "ERROR_up"
2653
            else:
2654
              if instance.status != "down":
2655
                val = "ERROR_down"
2656
              else:
2657
                val = "ADMIN_down"
2658
        elif field == "admin_ram":
2659
          val = instance.memory
2660
        elif field == "oper_ram":
2661
          if instance.primary_node in bad_nodes:
2662
            val = None
2663
          elif instance.name in live_data:
2664
            val = live_data[instance.name].get("memory", "?")
2665
          else:
2666
            val = "-"
2667
        elif field == "disk_template":
2668
          val = instance.disk_template
2669
        elif field == "ip":
2670
          val = instance.nics[0].ip
2671
        elif field == "bridge":
2672
          val = instance.nics[0].bridge
2673
        elif field == "mac":
2674
          val = instance.nics[0].mac
2675
        elif field == "sda_size" or field == "sdb_size":
2676
          disk = instance.FindDisk(field[:3])
2677
          if disk is None:
2678
            val = None
2679
          else:
2680
            val = disk.size
2681
        elif field == "vcpus":
2682
          val = instance.vcpus
2683
        elif field == "tags":
2684
          val = list(instance.GetTags())
2685
        elif field in ("network_port", "kernel_path", "initrd_path",
2686
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2687
                       "hvm_cdrom_image_path", "hvm_nic_type",
2688
                       "hvm_disk_type", "vnc_bind_address"):
2689
          val = getattr(instance, field, None)
2690
          if val is not None:
2691
            pass
2692
          elif field in ("hvm_nic_type", "hvm_disk_type",
2693
                         "kernel_path", "initrd_path"):
2694
            val = "default"
2695
          else:
2696
            val = "-"
2697
        else:
2698
          raise errors.ParameterError(field)
2699
        iout.append(val)
2700
      output.append(iout)
2701

    
2702
    return output
2703

    
2704

    
2705
class LUFailoverInstance(LogicalUnit):
2706
  """Failover an instance.
2707

2708
  """
2709
  HPATH = "instance-failover"
2710
  HTYPE = constants.HTYPE_INSTANCE
2711
  _OP_REQP = ["instance_name", "ignore_consistency"]
2712
  REQ_BGL = False
2713

    
2714
  def ExpandNames(self):
2715
    self._ExpandAndLockInstance()
2716
    self.needed_locks[locking.LEVEL_NODE] = []
2717
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2718

    
2719
  def DeclareLocks(self, level):
2720
    if level == locking.LEVEL_NODE:
2721
      self._LockInstancesNodes()
2722

    
2723
  def BuildHooksEnv(self):
2724
    """Build hooks env.
2725

2726
    This runs on master, primary and secondary nodes of the instance.
2727

2728
    """
2729
    env = {
2730
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2731
      }
2732
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2733
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2734
    return env, nl, nl
2735

    
2736
  def CheckPrereq(self):
2737
    """Check prerequisites.
2738

2739
    This checks that the instance is in the cluster.
2740

2741
    """
2742
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2743
    assert self.instance is not None, \
2744
      "Cannot retrieve locked instance %s" % self.op.instance_name
2745

    
2746
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2747
      raise errors.OpPrereqError("Instance's disk layout is not"
2748
                                 " network mirrored, cannot failover.")
2749

    
2750
    secondary_nodes = instance.secondary_nodes
2751
    if not secondary_nodes:
2752
      raise errors.ProgrammerError("no secondary node but using "
2753
                                   "a mirrored disk template")
2754

    
2755
    target_node = secondary_nodes[0]
2756
    # check memory requirements on the secondary node
2757
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2758
                         instance.name, instance.memory)
2759

    
2760
    # check bridge existance
2761
    brlist = [nic.bridge for nic in instance.nics]
2762
    if not rpc.call_bridges_exist(target_node, brlist):
2763
      raise errors.OpPrereqError("One or more target bridges %s does not"
2764
                                 " exist on destination node '%s'" %
2765
                                 (brlist, target_node))
2766

    
2767
  def Exec(self, feedback_fn):
2768
    """Failover an instance.
2769

2770
    The failover is done by shutting it down on its present node and
2771
    starting it on the secondary.
2772

2773
    """
2774
    instance = self.instance
2775

    
2776
    source_node = instance.primary_node
2777
    target_node = instance.secondary_nodes[0]
2778

    
2779
    feedback_fn("* checking disk consistency between source and target")
2780
    for dev in instance.disks:
2781
      # for drbd, these are drbd over lvm
2782
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2783
        if instance.status == "up" and not self.op.ignore_consistency:
2784
          raise errors.OpExecError("Disk %s is degraded on target node,"
2785
                                   " aborting failover." % dev.iv_name)
2786

    
2787
    feedback_fn("* shutting down instance on source node")
2788
    logger.Info("Shutting down instance %s on node %s" %
2789
                (instance.name, source_node))
2790

    
2791
    if not rpc.call_instance_shutdown(source_node, instance):
2792
      if self.op.ignore_consistency:
2793
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2794
                     " anyway. Please make sure node %s is down"  %
2795
                     (instance.name, source_node, source_node))
2796
      else:
2797
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2798
                                 (instance.name, source_node))
2799

    
2800
    feedback_fn("* deactivating the instance's disks on source node")
2801
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2802
      raise errors.OpExecError("Can't shut down the instance's disks.")
2803

    
2804
    instance.primary_node = target_node
2805
    # distribute new instance config to the other nodes
2806
    self.cfg.Update(instance)
2807

    
2808
    # Only start the instance if it's marked as up
2809
    if instance.status == "up":
2810
      feedback_fn("* activating the instance's disks on target node")
2811
      logger.Info("Starting instance %s on node %s" %
2812
                  (instance.name, target_node))
2813

    
2814
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2815
                                               ignore_secondaries=True)
2816
      if not disks_ok:
2817
        _ShutdownInstanceDisks(instance, self.cfg)
2818
        raise errors.OpExecError("Can't activate the instance's disks")
2819

    
2820
      feedback_fn("* starting the instance on the target node")
2821
      if not rpc.call_instance_start(target_node, instance, None):
2822
        _ShutdownInstanceDisks(instance, self.cfg)
2823
        raise errors.OpExecError("Could not start instance %s on node %s." %
2824
                                 (instance.name, target_node))
2825

    
2826

    
2827
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2828
  """Create a tree of block devices on the primary node.
2829

2830
  This always creates all devices.
2831

2832
  """
2833
  if device.children:
2834
    for child in device.children:
2835
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2836
        return False
2837

    
2838
  cfg.SetDiskID(device, node)
2839
  new_id = rpc.call_blockdev_create(node, device, device.size,
2840
                                    instance.name, True, info)
2841
  if not new_id:
2842
    return False
2843
  if device.physical_id is None:
2844
    device.physical_id = new_id
2845
  return True
2846

    
2847

    
2848
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2849
  """Create a tree of block devices on a secondary node.
2850

2851
  If this device type has to be created on secondaries, create it and
2852
  all its children.
2853

2854
  If not, just recurse to children keeping the same 'force' value.
2855

2856
  """
2857
  if device.CreateOnSecondary():
2858
    force = True
2859
  if device.children:
2860
    for child in device.children:
2861
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2862
                                        child, force, info):
2863
        return False
2864

    
2865
  if not force:
2866
    return True
2867
  cfg.SetDiskID(device, node)
2868
  new_id = rpc.call_blockdev_create(node, device, device.size,
2869
                                    instance.name, False, info)
2870
  if not new_id:
2871
    return False
2872
  if device.physical_id is None:
2873
    device.physical_id = new_id
2874
  return True
2875

    
2876

    
2877
def _GenerateUniqueNames(cfg, exts):
2878
  """Generate a suitable LV name.
2879

2880
  This will generate a logical volume name for the given instance.
2881

2882
  """
2883
  results = []
2884
  for val in exts:
2885
    new_id = cfg.GenerateUniqueID()
2886
    results.append("%s%s" % (new_id, val))
2887
  return results
2888

    
2889

    
2890
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2891
  """Generate a drbd8 device complete with its children.
2892

2893
  """
2894
  port = cfg.AllocatePort()
2895
  vgname = cfg.GetVGName()
2896
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2897
                          logical_id=(vgname, names[0]))
2898
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2899
                          logical_id=(vgname, names[1]))
2900
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2901
                          logical_id = (primary, secondary, port),
2902
                          children = [dev_data, dev_meta],
2903
                          iv_name=iv_name)
2904
  return drbd_dev
2905

    
2906

    
2907
def _GenerateDiskTemplate(cfg, template_name,
2908
                          instance_name, primary_node,
2909
                          secondary_nodes, disk_sz, swap_sz,
2910
                          file_storage_dir, file_driver):
2911
  """Generate the entire disk layout for a given template type.
2912

2913
  """
2914
  #TODO: compute space requirements
2915

    
2916
  vgname = cfg.GetVGName()
2917
  if template_name == constants.DT_DISKLESS:
2918
    disks = []
2919
  elif template_name == constants.DT_PLAIN:
2920
    if len(secondary_nodes) != 0:
2921
      raise errors.ProgrammerError("Wrong template configuration")
2922

    
2923
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2924
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2925
                           logical_id=(vgname, names[0]),
2926
                           iv_name = "sda")
2927
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2928
                           logical_id=(vgname, names[1]),
2929
                           iv_name = "sdb")
2930
    disks = [sda_dev, sdb_dev]
2931
  elif template_name == constants.DT_DRBD8:
2932
    if len(secondary_nodes) != 1:
2933
      raise errors.ProgrammerError("Wrong template configuration")
2934
    remote_node = secondary_nodes[0]
2935
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2936
                                       ".sdb_data", ".sdb_meta"])
2937
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2938
                                         disk_sz, names[0:2], "sda")
2939
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2940
                                         swap_sz, names[2:4], "sdb")
2941
    disks = [drbd_sda_dev, drbd_sdb_dev]
2942
  elif template_name == constants.DT_FILE:
2943
    if len(secondary_nodes) != 0:
2944
      raise errors.ProgrammerError("Wrong template configuration")
2945

    
2946
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2947
                                iv_name="sda", logical_id=(file_driver,
2948
                                "%s/sda" % file_storage_dir))
2949
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2950
                                iv_name="sdb", logical_id=(file_driver,
2951
                                "%s/sdb" % file_storage_dir))
2952
    disks = [file_sda_dev, file_sdb_dev]
2953
  else:
2954
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2955
  return disks
2956

    
2957

    
2958
def _GetInstanceInfoText(instance):
2959
  """Compute that text that should be added to the disk's metadata.
2960

2961
  """
2962
  return "originstname+%s" % instance.name
2963

    
2964

    
2965
def _CreateDisks(cfg, instance):
2966
  """Create all disks for an instance.
2967

2968
  This abstracts away some work from AddInstance.
2969

2970
  Args:
2971
    instance: the instance object
2972

2973
  Returns:
2974
    True or False showing the success of the creation process
2975

2976
  """
2977
  info = _GetInstanceInfoText(instance)
2978

    
2979
  if instance.disk_template == constants.DT_FILE:
2980
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2981
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2982
                                              file_storage_dir)
2983

    
2984
    if not result:
2985
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2986
      return False
2987

    
2988
    if not result[0]:
2989
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2990
      return False
2991

    
2992
  for device in instance.disks:
2993
    logger.Info("creating volume %s for instance %s" %
2994
                (device.iv_name, instance.name))
2995
    #HARDCODE
2996
    for secondary_node in instance.secondary_nodes:
2997
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2998
                                        device, False, info):
2999
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3000
                     (device.iv_name, device, secondary_node))
3001
        return False
3002
    #HARDCODE
3003
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3004
                                    instance, device, info):
3005
      logger.Error("failed to create volume %s on primary!" %
3006
                   device.iv_name)
3007
      return False
3008

    
3009
  return True
3010

    
3011

    
3012
def _RemoveDisks(instance, cfg):
3013
  """Remove all disks for an instance.
3014

3015
  This abstracts away some work from `AddInstance()` and
3016
  `RemoveInstance()`. Note that in case some of the devices couldn't
3017
  be removed, the removal will continue with the other ones (compare
3018
  with `_CreateDisks()`).
3019

3020
  Args:
3021
    instance: the instance object
3022

3023
  Returns:
3024
    True or False showing the success of the removal proces
3025

3026
  """
3027
  logger.Info("removing block devices for instance %s" % instance.name)
3028

    
3029
  result = True
3030
  for device in instance.disks:
3031
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3032
      cfg.SetDiskID(disk, node)
3033
      if not rpc.call_blockdev_remove(node, disk):
3034
        logger.Error("could not remove block device %s on node %s,"
3035
                     " continuing anyway" %
3036
                     (device.iv_name, node))
3037
        result = False
3038

    
3039
  if instance.disk_template == constants.DT_FILE:
3040
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3041
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3042
                                            file_storage_dir):
3043
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3044
      result = False
3045

    
3046
  return result
3047

    
3048

    
3049
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3050
  """Compute disk size requirements in the volume group
3051

3052
  This is currently hard-coded for the two-drive layout.
3053

3054
  """
3055
  # Required free disk space as a function of disk and swap space
3056
  req_size_dict = {
3057
    constants.DT_DISKLESS: None,
3058
    constants.DT_PLAIN: disk_size + swap_size,
3059
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3060
    constants.DT_DRBD8: disk_size + swap_size + 256,
3061
    constants.DT_FILE: None,
3062
  }
3063

    
3064
  if disk_template not in req_size_dict:
3065
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3066
                                 " is unknown" %  disk_template)
3067

    
3068
  return req_size_dict[disk_template]
3069

    
3070

    
3071
class LUCreateInstance(LogicalUnit):
3072
  """Create an instance.
3073

3074
  """
3075
  HPATH = "instance-add"
3076
  HTYPE = constants.HTYPE_INSTANCE
3077
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3078
              "disk_template", "swap_size", "mode", "start", "vcpus",
3079
              "wait_for_sync", "ip_check", "mac"]
3080
  REQ_BGL = False
3081

    
3082
  def _ExpandNode(self, node):
3083
    """Expands and checks one node name.
3084

3085
    """
3086
    node_full = self.cfg.ExpandNodeName(node)
3087
    if node_full is None:
3088
      raise errors.OpPrereqError("Unknown node %s" % node)
3089
    return node_full
3090

    
3091
  def ExpandNames(self):
3092
    """ExpandNames for CreateInstance.
3093

3094
    Figure out the right locks for instance creation.
3095

3096
    """
3097
    self.needed_locks = {}
3098

    
3099
    # set optional parameters to none if they don't exist
3100
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3101
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3102
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3103
                 "vnc_bind_address"]:
3104
      if not hasattr(self.op, attr):
3105
        setattr(self.op, attr, None)
3106

    
3107
    # verify creation mode
3108
    if self.op.mode not in (constants.INSTANCE_CREATE,
3109
                            constants.INSTANCE_IMPORT):
3110
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3111
                                 self.op.mode)
3112
    # disk template and mirror node verification
3113
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3114
      raise errors.OpPrereqError("Invalid disk template name")
3115

    
3116
    #### instance parameters check
3117

    
3118
    # instance name verification
3119
    hostname1 = utils.HostInfo(self.op.instance_name)
3120
    self.op.instance_name = instance_name = hostname1.name
3121

    
3122
    # this is just a preventive check, but someone might still add this
3123
    # instance in the meantime, and creation will fail at lock-add time
3124
    if instance_name in self.cfg.GetInstanceList():
3125
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3126
                                 instance_name)
3127

    
3128
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3129

    
3130
    # ip validity checks
3131
    ip = getattr(self.op, "ip", None)
3132
    if ip is None or ip.lower() == "none":
3133
      inst_ip = None
3134
    elif ip.lower() == "auto":
3135
      inst_ip = hostname1.ip
3136
    else:
3137
      if not utils.IsValidIP(ip):
3138
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3139
                                   " like a valid IP" % ip)
3140
      inst_ip = ip
3141
    self.inst_ip = self.op.ip = inst_ip
3142
    # used in CheckPrereq for ip ping check
3143
    self.check_ip = hostname1.ip
3144

    
3145
    # MAC address verification
3146
    if self.op.mac != "auto":
3147
      if not utils.IsValidMac(self.op.mac.lower()):
3148
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3149
                                   self.op.mac)
3150

    
3151
    # boot order verification
3152
    if self.op.hvm_boot_order is not None:
3153
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3154
        raise errors.OpPrereqError("invalid boot order specified,"
3155
                                   " must be one or more of [acdn]")
3156
    # file storage checks
3157
    if (self.op.file_driver and
3158
        not self.op.file_driver in constants.FILE_DRIVER):
3159
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3160
                                 self.op.file_driver)
3161

    
3162
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3163
      raise errors.OpPrereqError("File storage directory path not absolute")
3164

    
3165
    ### Node/iallocator related checks
3166
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3167
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3168
                                 " node must be given")
3169

    
3170
    if self.op.iallocator:
3171
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3172
    else:
3173
      self.op.pnode = self._ExpandNode(self.op.pnode)
3174
      nodelist = [self.op.pnode]
3175
      if self.op.snode is not None:
3176
        self.op.snode = self._ExpandNode(self.op.snode)
3177
        nodelist.append(self.op.snode)
3178
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3179

    
3180
    # in case of import lock the source node too
3181
    if self.op.mode == constants.INSTANCE_IMPORT:
3182
      src_node = getattr(self.op, "src_node", None)
3183
      src_path = getattr(self.op, "src_path", None)
3184

    
3185
      if src_node is None or src_path is None:
3186
        raise errors.OpPrereqError("Importing an instance requires source"
3187
                                   " node and path options")
3188

    
3189
      if not os.path.isabs(src_path):
3190
        raise errors.OpPrereqError("The source path must be absolute")
3191

    
3192
      self.op.src_node = src_node = self._ExpandNode(src_node)
3193
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3194
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3195

    
3196
    else: # INSTANCE_CREATE
3197
      if getattr(self.op, "os_type", None) is None:
3198
        raise errors.OpPrereqError("No guest OS specified")
3199

    
3200
  def _RunAllocator(self):
3201
    """Run the allocator based on input opcode.
3202

3203
    """
3204
    disks = [{"size": self.op.disk_size, "mode": "w"},
3205
             {"size": self.op.swap_size, "mode": "w"}]
3206
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3207
             "bridge": self.op.bridge}]
3208
    ial = IAllocator(self.cfg, self.sstore,
3209
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3210
                     name=self.op.instance_name,
3211
                     disk_template=self.op.disk_template,
3212
                     tags=[],
3213
                     os=self.op.os_type,
3214
                     vcpus=self.op.vcpus,
3215
                     mem_size=self.op.mem_size,
3216
                     disks=disks,
3217
                     nics=nics,
3218
                     )
3219

    
3220
    ial.Run(self.op.iallocator)
3221

    
3222
    if not ial.success:
3223
      raise errors.OpPrereqError("Can't compute nodes using"
3224
                                 " iallocator '%s': %s" % (self.op.iallocator,
3225
                                                           ial.info))
3226
    if len(ial.nodes) != ial.required_nodes:
3227
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3228
                                 " of nodes (%s), required %s" %
3229
                                 (len(ial.nodes), ial.required_nodes))
3230
    self.op.pnode = ial.nodes[0]
3231
    logger.ToStdout("Selected nodes for the instance: %s" %
3232
                    (", ".join(ial.nodes),))
3233
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3234
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3235
    if ial.required_nodes == 2:
3236
      self.op.snode = ial.nodes[1]
3237

    
3238
  def BuildHooksEnv(self):
3239
    """Build hooks env.
3240

3241
    This runs on master, primary and secondary nodes of the instance.
3242

3243
    """
3244
    env = {
3245
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3246
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3247
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3248
      "INSTANCE_ADD_MODE": self.op.mode,
3249
      }
3250
    if self.op.mode == constants.INSTANCE_IMPORT:
3251
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3252
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3253
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3254

    
3255
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3256
      primary_node=self.op.pnode,
3257
      secondary_nodes=self.secondaries,
3258
      status=self.instance_status,
3259
      os_type=self.op.os_type,
3260
      memory=self.op.mem_size,
3261
      vcpus=self.op.vcpus,
3262
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3263
    ))
3264

    
3265
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3266
          self.secondaries)
3267
    return env, nl, nl
3268

    
3269

    
3270
  def CheckPrereq(self):
3271
    """Check prerequisites.
3272

3273
    """
3274
    if (not self.cfg.GetVGName() and
3275
        self.op.disk_template not in constants.DTS_NOT_LVM):
3276
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3277
                                 " instances")
3278

    
3279
    if self.op.mode == constants.INSTANCE_IMPORT:
3280
      src_node = self.op.src_node
3281
      src_path = self.op.src_path
3282

    
3283
      export_info = rpc.call_export_info(src_node, src_path)
3284

    
3285
      if not export_info:
3286
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3287

    
3288
      if not export_info.has_section(constants.INISECT_EXP):
3289
        raise errors.ProgrammerError("Corrupted export config")
3290

    
3291
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3292
      if (int(ei_version) != constants.EXPORT_VERSION):
3293
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3294
                                   (ei_version, constants.EXPORT_VERSION))
3295

    
3296
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3297
        raise errors.OpPrereqError("Can't import instance with more than"
3298
                                   " one data disk")
3299

    
3300
      # FIXME: are the old os-es, disk sizes, etc. useful?
3301
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3302
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3303
                                                         'disk0_dump'))
3304
      self.src_image = diskimage
3305

    
3306
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3307

    
3308
    if self.op.start and not self.op.ip_check:
3309
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3310
                                 " adding an instance in start mode")
3311

    
3312
    if self.op.ip_check:
3313
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3314
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3315
                                   (self.check_ip, instance_name))
3316

    
3317
    # bridge verification
3318
    bridge = getattr(self.op, "bridge", None)
3319
    if bridge is None:
3320
      self.op.bridge = self.cfg.GetDefBridge()
3321
    else:
3322
      self.op.bridge = bridge
3323

    
3324
    #### allocator run
3325

    
3326
    if self.op.iallocator is not None:
3327
      self._RunAllocator()
3328

    
3329
    #### node related checks
3330

    
3331
    # check primary node
3332
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3333
    assert self.pnode is not None, \
3334
      "Cannot retrieve locked node %s" % self.op.pnode
3335
    self.secondaries = []
3336

    
3337
    # mirror node verification
3338
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3339
      if self.op.snode is None:
3340
        raise errors.OpPrereqError("The networked disk templates need"
3341
                                   " a mirror node")
3342
      if self.op.snode == pnode.name:
3343
        raise errors.OpPrereqError("The secondary node cannot be"
3344
                                   " the primary node.")
3345
      self.secondaries.append(self.op.snode)
3346

    
3347
    req_size = _ComputeDiskSize(self.op.disk_template,
3348
                                self.op.disk_size, self.op.swap_size)
3349

    
3350
    # Check lv size requirements
3351
    if req_size is not None:
3352
      nodenames = [pnode.name] + self.secondaries
3353
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3354
      for node in nodenames:
3355
        info = nodeinfo.get(node, None)
3356
        if not info:
3357
          raise errors.OpPrereqError("Cannot get current information"
3358
                                     " from node '%s'" % node)
3359
        vg_free = info.get('vg_free', None)
3360
        if not isinstance(vg_free, int):
3361
          raise errors.OpPrereqError("Can't compute free disk space on"
3362
                                     " node %s" % node)
3363
        if req_size > info['vg_free']:
3364
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3365
                                     " %d MB available, %d MB required" %
3366
                                     (node, info['vg_free'], req_size))
3367

    
3368
    # os verification
3369
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3370
    if not os_obj:
3371
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3372
                                 " primary node"  % self.op.os_type)
3373

    
3374
    if self.op.kernel_path == constants.VALUE_NONE:
3375
      raise errors.OpPrereqError("Can't set instance kernel to none")
3376

    
3377
    # bridge check on primary node
3378
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3379
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3380
                                 " destination node '%s'" %
3381
                                 (self.op.bridge, pnode.name))
3382

    
3383
    # memory check on primary node
3384
    if self.op.start:
3385
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3386
                           "creating instance %s" % self.op.instance_name,
3387
                           self.op.mem_size)
3388

    
3389
    # hvm_cdrom_image_path verification
3390
    if self.op.hvm_cdrom_image_path is not None:
3391
      # FIXME (als): shouldn't these checks happen on the destination node?
3392
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3393
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3394
                                   " be an absolute path or None, not %s" %
3395
                                   self.op.hvm_cdrom_image_path)
3396
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3397
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3398
                                   " regular file or a symlink pointing to"
3399
                                   " an existing regular file, not %s" %
3400
                                   self.op.hvm_cdrom_image_path)
3401

    
3402
    # vnc_bind_address verification
3403
    if self.op.vnc_bind_address is not None:
3404
      if not utils.IsValidIP(self.op.vnc_bind_address):
3405
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3406
                                   " like a valid IP address" %
3407
                                   self.op.vnc_bind_address)
3408

    
3409
    # Xen HVM device type checks
3410
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3411
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3412
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3413
                                   " hypervisor" % self.op.hvm_nic_type)
3414
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3415
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3416
                                   " hypervisor" % self.op.hvm_disk_type)
3417

    
3418
    if self.op.start:
3419
      self.instance_status = 'up'
3420
    else:
3421
      self.instance_status = 'down'
3422

    
3423
  def Exec(self, feedback_fn):
3424
    """Create and add the instance to the cluster.
3425

3426
    """
3427
    instance = self.op.instance_name
3428
    pnode_name = self.pnode.name
3429

    
3430
    if self.op.mac == "auto":
3431
      mac_address = self.cfg.GenerateMAC()
3432
    else:
3433
      mac_address = self.op.mac
3434

    
3435
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3436
    if self.inst_ip is not None:
3437
      nic.ip = self.inst_ip
3438

    
3439
    ht_kind = self.sstore.GetHypervisorType()
3440
    if ht_kind in constants.HTS_REQ_PORT:
3441
      network_port = self.cfg.AllocatePort()
3442
    else:
3443
      network_port = None
3444

    
3445
    if self.op.vnc_bind_address is None:
3446
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3447

    
3448
    # this is needed because os.path.join does not accept None arguments
3449
    if self.op.file_storage_dir is None:
3450
      string_file_storage_dir = ""
3451
    else:
3452
      string_file_storage_dir = self.op.file_storage_dir
3453

    
3454
    # build the full file storage dir path
3455
    file_storage_dir = os.path.normpath(os.path.join(
3456
                                        self.sstore.GetFileStorageDir(),
3457
                                        string_file_storage_dir, instance))
3458

    
3459

    
3460
    disks = _GenerateDiskTemplate(self.cfg,
3461
                                  self.op.disk_template,
3462
                                  instance, pnode_name,
3463
                                  self.secondaries, self.op.disk_size,
3464
                                  self.op.swap_size,
3465
                                  file_storage_dir,
3466
                                  self.op.file_driver)
3467

    
3468
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3469
                            primary_node=pnode_name,
3470
                            memory=self.op.mem_size,
3471
                            vcpus=self.op.vcpus,
3472
                            nics=[nic], disks=disks,
3473
                            disk_template=self.op.disk_template,
3474
                            status=self.instance_status,
3475
                            network_port=network_port,
3476
                            kernel_path=self.op.kernel_path,
3477
                            initrd_path=self.op.initrd_path,
3478
                            hvm_boot_order=self.op.hvm_boot_order,
3479
                            hvm_acpi=self.op.hvm_acpi,
3480
                            hvm_pae=self.op.hvm_pae,
3481
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3482
                            vnc_bind_address=self.op.vnc_bind_address,
3483
                            hvm_nic_type=self.op.hvm_nic_type,
3484
                            hvm_disk_type=self.op.hvm_disk_type,
3485
                            )
3486

    
3487
    feedback_fn("* creating instance disks...")
3488
    if not _CreateDisks(self.cfg, iobj):
3489
      _RemoveDisks(iobj, self.cfg)
3490
      raise errors.OpExecError("Device creation failed, reverting...")
3491

    
3492
    feedback_fn("adding instance %s to cluster config" % instance)
3493

    
3494
    self.cfg.AddInstance(iobj)
3495
    # Declare that we don't want to remove the instance lock anymore, as we've
3496
    # added the instance to the config
3497
    del self.remove_locks[locking.LEVEL_INSTANCE]
3498

    
3499
    if self.op.wait_for_sync:
3500
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3501
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3502
      # make sure the disks are not degraded (still sync-ing is ok)
3503
      time.sleep(15)
3504
      feedback_fn("* checking mirrors status")
3505
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3506
    else:
3507
      disk_abort = False
3508

    
3509
    if disk_abort:
3510
      _RemoveDisks(iobj, self.cfg)
3511
      self.cfg.RemoveInstance(iobj.name)
3512
      # Make sure the instance lock gets removed
3513
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3514
      raise errors.OpExecError("There are some degraded disks for"
3515
                               " this instance")
3516

    
3517
    feedback_fn("creating os for instance %s on node %s" %
3518
                (instance, pnode_name))
3519

    
3520
    if iobj.disk_template != constants.DT_DISKLESS:
3521
      if self.op.mode == constants.INSTANCE_CREATE:
3522
        feedback_fn("* running the instance OS create scripts...")
3523
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3524
          raise errors.OpExecError("could not add os for instance %s"
3525
                                   " on node %s" %
3526
                                   (instance, pnode_name))
3527

    
3528
      elif self.op.mode == constants.INSTANCE_IMPORT:
3529
        feedback_fn("* running the instance OS import scripts...")
3530
        src_node = self.op.src_node
3531
        src_image = self.src_image
3532
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3533
                                                src_node, src_image):
3534
          raise errors.OpExecError("Could not import os for instance"
3535
                                   " %s on node %s" %
3536
                                   (instance, pnode_name))
3537
      else:
3538
        # also checked in the prereq part
3539
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3540
                                     % self.op.mode)
3541

    
3542
    if self.op.start:
3543
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3544
      feedback_fn("* starting instance...")
3545
      if not rpc.call_instance_start(pnode_name, iobj, None):
3546
        raise errors.OpExecError("Could not start instance")
3547

    
3548

    
3549
class LUConnectConsole(NoHooksLU):
3550
  """Connect to an instance's console.
3551

3552
  This is somewhat special in that it returns the command line that
3553
  you need to run on the master node in order to connect to the
3554
  console.
3555

3556
  """
3557
  _OP_REQP = ["instance_name"]
3558
  REQ_BGL = False
3559

    
3560
  def ExpandNames(self):
3561
    self._ExpandAndLockInstance()
3562

    
3563
  def CheckPrereq(self):
3564
    """Check prerequisites.
3565

3566
    This checks that the instance is in the cluster.
3567

3568
    """
3569
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3570
    assert self.instance is not None, \
3571
      "Cannot retrieve locked instance %s" % self.op.instance_name
3572

    
3573
  def Exec(self, feedback_fn):
3574
    """Connect to the console of an instance
3575

3576
    """
3577
    instance = self.instance
3578
    node = instance.primary_node
3579

    
3580
    node_insts = rpc.call_instance_list([node])[node]
3581
    if node_insts is False:
3582
      raise errors.OpExecError("Can't connect to node %s." % node)
3583

    
3584
    if instance.name not in node_insts:
3585
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3586

    
3587
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3588

    
3589
    hyper = hypervisor.GetHypervisor()
3590
    console_cmd = hyper.GetShellCommandForConsole(instance)
3591

    
3592
    # build ssh cmdline
3593
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3594

    
3595

    
3596
class LUReplaceDisks(LogicalUnit):
3597
  """Replace the disks of an instance.
3598

3599
  """
3600
  HPATH = "mirrors-replace"
3601
  HTYPE = constants.HTYPE_INSTANCE
3602
  _OP_REQP = ["instance_name", "mode", "disks"]
3603
  REQ_BGL = False
3604

    
3605
  def ExpandNames(self):
3606
    self._ExpandAndLockInstance()
3607

    
3608
    if not hasattr(self.op, "remote_node"):
3609
      self.op.remote_node = None
3610

    
3611
    ia_name = getattr(self.op, "iallocator", None)
3612
    if ia_name is not None:
3613
      if self.op.remote_node is not None:
3614
        raise errors.OpPrereqError("Give either the iallocator or the new"
3615
                                   " secondary, not both")
3616
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3617
    elif self.op.remote_node is not None:
3618
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3619
      if remote_node is None:
3620
        raise errors.OpPrereqError("Node '%s' not known" %
3621
                                   self.op.remote_node)
3622
      self.op.remote_node = remote_node
3623
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3624
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3625
    else:
3626
      self.needed_locks[locking.LEVEL_NODE] = []
3627
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3628

    
3629
  def DeclareLocks(self, level):
3630
    # If we're not already locking all nodes in the set we have to declare the
3631
    # instance's primary/secondary nodes.
3632
    if (level == locking.LEVEL_NODE and
3633
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3634
      self._LockInstancesNodes()
3635

    
3636
  def _RunAllocator(self):
3637
    """Compute a new secondary node using an IAllocator.
3638

3639
    """
3640
    ial = IAllocator(self.cfg, self.sstore,
3641
                     mode=constants.IALLOCATOR_MODE_RELOC,
3642
                     name=self.op.instance_name,
3643
                     relocate_from=[self.sec_node])
3644

    
3645
    ial.Run(self.op.iallocator)
3646

    
3647
    if not ial.success:
3648
      raise errors.OpPrereqError("Can't compute nodes using"
3649
                                 " iallocator '%s': %s" % (self.op.iallocator,
3650
                                                           ial.info))
3651
    if len(ial.nodes) != ial.required_nodes:
3652
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3653
                                 " of nodes (%s), required %s" %
3654
                                 (len(ial.nodes), ial.required_nodes))
3655
    self.op.remote_node = ial.nodes[0]
3656
    logger.ToStdout("Selected new secondary for the instance: %s" %
3657
                    self.op.remote_node)
3658

    
3659
  def BuildHooksEnv(self):
3660
    """Build hooks env.
3661

3662
    This runs on the master, the primary and all the secondaries.
3663

3664
    """
3665
    env = {
3666
      "MODE": self.op.mode,
3667
      "NEW_SECONDARY": self.op.remote_node,
3668
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3669
      }
3670
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3671
    nl = [
3672
      self.sstore.GetMasterNode(),
3673
      self.instance.primary_node,
3674
      ]
3675
    if self.op.remote_node is not None:
3676
      nl.append(self.op.remote_node)
3677
    return env, nl, nl
3678

    
3679
  def CheckPrereq(self):
3680
    """Check prerequisites.
3681

3682
    This checks that the instance is in the cluster.
3683

3684
    """
3685
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3686
    assert instance is not None, \
3687
      "Cannot retrieve locked instance %s" % self.op.instance_name
3688
    self.instance = instance
3689

    
3690
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3691
      raise errors.OpPrereqError("Instance's disk layout is not"
3692
                                 " network mirrored.")
3693

    
3694
    if len(instance.secondary_nodes) != 1:
3695
      raise errors.OpPrereqError("The instance has a strange layout,"
3696
                                 " expected one secondary but found %d" %
3697
                                 len(instance.secondary_nodes))
3698

    
3699
    self.sec_node = instance.secondary_nodes[0]
3700

    
3701
    ia_name = getattr(self.op, "iallocator", None)
3702
    if ia_name is not None:
3703
      self._RunAllocator()
3704

    
3705
    remote_node = self.op.remote_node
3706
    if remote_node is not None:
3707
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3708
      assert self.remote_node_info is not None, \
3709
        "Cannot retrieve locked node %s" % remote_node
3710
    else:
3711
      self.remote_node_info = None
3712
    if remote_node == instance.primary_node:
3713
      raise errors.OpPrereqError("The specified node is the primary node of"
3714
                                 " the instance.")
3715
    elif remote_node == self.sec_node:
3716
      if self.op.mode == constants.REPLACE_DISK_SEC:
3717
        # this is for DRBD8, where we can't execute the same mode of
3718
        # replacement as for drbd7 (no different port allocated)
3719
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3720
                                   " replacement")
3721
    if instance.disk_template == constants.DT_DRBD8:
3722
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3723
          remote_node is not None):
3724
        # switch to replace secondary mode
3725
        self.op.mode = constants.REPLACE_DISK_SEC
3726

    
3727
      if self.op.mode == constants.REPLACE_DISK_ALL:
3728
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3729
                                   " secondary disk replacement, not"
3730
                                   " both at once")
3731
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3732
        if remote_node is not None:
3733
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3734
                                     " the secondary while doing a primary"
3735
                                     " node disk replacement")
3736
        self.tgt_node = instance.primary_node
3737
        self.oth_node = instance.secondary_nodes[0]
3738
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3739
        self.new_node = remote_node # this can be None, in which case
3740
                                    # we don't change the secondary
3741
        self.tgt_node = instance.secondary_nodes[0]
3742
        self.oth_node = instance.primary_node
3743
      else:
3744
        raise errors.ProgrammerError("Unhandled disk replace mode")
3745

    
3746
    for name in self.op.disks:
3747
      if instance.FindDisk(name) is None:
3748
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3749
                                   (name, instance.name))
3750

    
3751
  def _ExecD8DiskOnly(self, feedback_fn):
3752
    """Replace a disk on the primary or secondary for dbrd8.
3753

3754
    The algorithm for replace is quite complicated:
3755
      - for each disk to be replaced:
3756
        - create new LVs on the target node with unique names
3757
        - detach old LVs from the drbd device
3758
        - rename old LVs to name_replaced.<time_t>
3759
        - rename new LVs to old LVs
3760
        - attach the new LVs (with the old names now) to the drbd device
3761
      - wait for sync across all devices
3762
      - for each modified disk:
3763
        - remove old LVs (which have the name name_replaces.<time_t>)
3764

3765
    Failures are not very well handled.
3766

3767
    """
3768
    steps_total = 6
3769
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3770
    instance = self.instance
3771
    iv_names = {}
3772
    vgname = self.cfg.GetVGName()
3773
    # start of work
3774
    cfg = self.cfg
3775
    tgt_node = self.tgt_node
3776
    oth_node = self.oth_node
3777

    
3778
    # Step: check device activation
3779
    self.proc.LogStep(1, steps_total, "check device existence")
3780
    info("checking volume groups")
3781
    my_vg = cfg.GetVGName()
3782
    results = rpc.call_vg_list([oth_node, tgt_node])
3783
    if not results:
3784
      raise errors.OpExecError("Can't list volume groups on the nodes")
3785
    for node in oth_node, tgt_node:
3786
      res = results.get(node, False)
3787
      if not res or my_vg not in res:
3788
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3789
                                 (my_vg, node))
3790
    for dev in instance.disks:
3791
      if not dev.iv_name in self.op.disks:
3792
        continue
3793
      for node in tgt_node, oth_node:
3794
        info("checking %s on %s" % (dev.iv_name, node))
3795
        cfg.SetDiskID(dev, node)
3796
        if not rpc.call_blockdev_find(node, dev):
3797
          raise errors.OpExecError("Can't find device %s on node %s" %
3798
                                   (dev.iv_name, node))
3799

    
3800
    # Step: check other node consistency
3801
    self.proc.LogStep(2, steps_total, "check peer consistency")
3802
    for dev in instance.disks:
3803
      if not dev.iv_name in self.op.disks:
3804
        continue
3805
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3806
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3807
                                   oth_node==instance.primary_node):
3808
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3809
                                 " to replace disks on this node (%s)" %
3810
                                 (oth_node, tgt_node))
3811

    
3812
    # Step: create new storage
3813
    self.proc.LogStep(3, steps_total, "allocate new storage")
3814
    for dev in instance.disks:
3815
      if not dev.iv_name in self.op.disks:
3816
        continue
3817
      size = dev.size
3818
      cfg.SetDiskID(dev, tgt_node)
3819
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3820
      names = _GenerateUniqueNames(cfg, lv_names)
3821
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3822
                             logical_id=(vgname, names[0]))
3823
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3824
                             logical_id=(vgname, names[1]))
3825
      new_lvs = [lv_data, lv_meta]
3826
      old_lvs = dev.children
3827
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3828
      info("creating new local storage on %s for %s" %
3829
           (tgt_node, dev.iv_name))
3830
      # since we *always* want to create this LV, we use the
3831
      # _Create...OnPrimary (which forces the creation), even if we
3832
      # are talking about the secondary node
3833
      for new_lv in new_lvs:
3834
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3835
                                        _GetInstanceInfoText(instance)):
3836
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3837
                                   " node '%s'" %
3838
                                   (new_lv.logical_id[1], tgt_node))
3839

    
3840
    # Step: for each lv, detach+rename*2+attach
3841
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3842
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3843
      info("detaching %s drbd from local storage" % dev.iv_name)
3844
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3845
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3846
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3847
      #dev.children = []
3848
      #cfg.Update(instance)
3849

    
3850
      # ok, we created the new LVs, so now we know we have the needed
3851
      # storage; as such, we proceed on the target node to rename
3852
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3853
      # using the assumption that logical_id == physical_id (which in
3854
      # turn is the unique_id on that node)
3855

    
3856
      # FIXME(iustin): use a better name for the replaced LVs
3857
      temp_suffix = int(time.time())
3858
      ren_fn = lambda d, suff: (d.physical_id[0],
3859
                                d.physical_id[1] + "_replaced-%s" % suff)
3860
      # build the rename list based on what LVs exist on the node
3861
      rlist = []
3862
      for to_ren in old_lvs:
3863
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3864
        if find_res is not None: # device exists
3865
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3866

    
3867
      info("renaming the old LVs on the target node")
3868
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3869
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3870
      # now we rename the new LVs to the old LVs
3871
      info("renaming the new LVs on the target node")
3872
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3873
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3874
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3875

    
3876
      for old, new in zip(old_lvs, new_lvs):
3877
        new.logical_id = old.logical_id
3878
        cfg.SetDiskID(new, tgt_node)
3879

    
3880
      for disk in old_lvs:
3881
        disk.logical_id = ren_fn(disk, temp_suffix)
3882
        cfg.SetDiskID(disk, tgt_node)
3883

    
3884
      # now that the new lvs have the old name, we can add them to the device
3885
      info("adding new mirror component on %s" % tgt_node)
3886
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3887
        for new_lv in new_lvs:
3888
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3889
            warning("Can't rollback device %s", hint="manually cleanup unused"
3890
                    " logical volumes")
3891
        raise errors.OpExecError("Can't add local storage to drbd")
3892

    
3893
      dev.children = new_lvs
3894
      cfg.Update(instance)
3895

    
3896
    # Step: wait for sync
3897

    
3898
    # this can fail as the old devices are degraded and _WaitForSync
3899
    # does a combined result over all disks, so we don't check its
3900
    # return value
3901
    self.proc.LogStep(5, steps_total, "sync devices")
3902
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3903

    
3904
    # so check manually all the devices
3905
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3906
      cfg.SetDiskID(dev, instance.primary_node)
3907
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3908
      if is_degr:
3909
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3910

    
3911
    # Step: remove old storage
3912
    self.proc.LogStep(6, steps_total, "removing old storage")
3913
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3914
      info("remove logical volumes for %s" % name)
3915
      for lv in old_lvs:
3916
        cfg.SetDiskID(lv, tgt_node)
3917
        if not rpc.call_blockdev_remove(tgt_node, lv):
3918
          warning("Can't remove old LV", hint="manually remove unused LVs")
3919
          continue
3920

    
3921
  def _ExecD8Secondary(self, feedback_fn):
3922
    """Replace the secondary node for drbd8.
3923

3924
    The algorithm for replace is quite complicated:
3925
      - for all disks of the instance:
3926
        - create new LVs on the new node with same names
3927
        - shutdown the drbd device on the old secondary
3928
        - disconnect the drbd network on the primary
3929
        - create the drbd device on the new secondary
3930
        - network attach the drbd on the primary, using an artifice:
3931
          the drbd code for Attach() will connect to the network if it
3932
          finds a device which is connected to the good local disks but
3933
          not network enabled
3934
      - wait for sync across all devices
3935
      - remove all disks from the old secondary
3936

3937
    Failures are not very well handled.
3938

3939
    """
3940
    steps_total = 6
3941
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3942
    instance = self.instance
3943
    iv_names = {}
3944
    vgname = self.cfg.GetVGName()
3945
    # start of work
3946
    cfg = self.cfg
3947
    old_node = self.tgt_node
3948
    new_node = self.new_node
3949
    pri_node = instance.primary_node
3950

    
3951
    # Step: check device activation
3952
    self.proc.LogStep(1, steps_total, "check device existence")
3953
    info("checking volume groups")
3954
    my_vg = cfg.GetVGName()
3955
    results = rpc.call_vg_list([pri_node, new_node])
3956
    if not results:
3957
      raise errors.OpExecError("Can't list volume groups on the nodes")
3958
    for node in pri_node, new_node:
3959
      res = results.get(node, False)
3960
      if not res or my_vg not in res:
3961
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3962
                                 (my_vg, node))
3963
    for dev in instance.disks:
3964
      if not dev.iv_name in self.op.disks:
3965
        continue
3966
      info("checking %s on %s" % (dev.iv_name, pri_node))
3967
      cfg.SetDiskID(dev, pri_node)
3968
      if not rpc.call_blockdev_find(pri_node, dev):
3969
        raise errors.OpExecError("Can't find device %s on node %s" %
3970
                                 (dev.iv_name, pri_node))
3971

    
3972
    # Step: check other node consistency
3973
    self.proc.LogStep(2, steps_total, "check peer consistency")
3974
    for dev in instance.disks:
3975
      if not dev.iv_name in self.op.disks:
3976
        continue