Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ c53279cf

History | View | Annotate | Download (185.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    # Dicts used to declare locking needs to mcpu
84
    self.needed_locks = None
85
    self.acquired_locks = {}
86
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87
    self.add_locks = {}
88
    self.remove_locks = {}
89
    # Used to force good behavior when calling helper functions
90
    self.recalculate_locks = {}
91
    self.__ssh = None
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98

    
99
    if not self.cfg.IsCluster():
100
      raise errors.OpPrereqError("Cluster not initialized yet,"
101
                                 " use 'gnt-cluster init' first.")
102
    if self.REQ_MASTER:
103
      master = sstore.GetMasterNode()
104
      if master != utils.HostInfo().name:
105
        raise errors.OpPrereqError("Commands must be run on the master"
106
                                   " node %s" % master)
107

    
108
  def __GetSSH(self):
109
    """Returns the SshRunner object
110

111
    """
112
    if not self.__ssh:
113
      self.__ssh = ssh.SshRunner(self.sstore)
114
    return self.__ssh
115

    
116
  ssh = property(fget=__GetSSH)
117

    
118
  def ExpandNames(self):
119
    """Expand names for this LU.
120

121
    This method is called before starting to execute the opcode, and it should
122
    update all the parameters of the opcode to their canonical form (e.g. a
123
    short node name must be fully expanded after this method has successfully
124
    completed). This way locking, hooks, logging, ecc. can work correctly.
125

126
    LUs which implement this method must also populate the self.needed_locks
127
    member, as a dict with lock levels as keys, and a list of needed lock names
128
    as values. Rules:
129
      - Use an empty dict if you don't need any lock
130
      - If you don't need any lock at a particular level omit that level
131
      - Don't put anything for the BGL level
132
      - If you want all locks at a level use locking.ALL_SET as a value
133

134
    If you need to share locks (rather than acquire them exclusively) at one
135
    level you can modify self.share_locks, setting a true value (usually 1) for
136
    that level. By default locks are not shared.
137

138
    Examples:
139
    # Acquire all nodes and one instance
140
    self.needed_locks = {
141
      locking.LEVEL_NODE: locking.ALL_SET,
142
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
143
    }
144
    # Acquire just two nodes
145
    self.needed_locks = {
146
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
147
    }
148
    # Acquire no locks
149
    self.needed_locks = {} # No, you can't leave it to the default value None
150

151
    """
152
    # The implementation of this method is mandatory only if the new LU is
153
    # concurrent, so that old LUs don't need to be changed all at the same
154
    # time.
155
    if self.REQ_BGL:
156
      self.needed_locks = {} # Exclusive LUs don't need locks.
157
    else:
158
      raise NotImplementedError
159

    
160
  def DeclareLocks(self, level):
161
    """Declare LU locking needs for a level
162

163
    While most LUs can just declare their locking needs at ExpandNames time,
164
    sometimes there's the need to calculate some locks after having acquired
165
    the ones before. This function is called just before acquiring locks at a
166
    particular level, but after acquiring the ones at lower levels, and permits
167
    such calculations. It can be used to modify self.needed_locks, and by
168
    default it does nothing.
169

170
    This function is only called if you have something already set in
171
    self.needed_locks for the level.
172

173
    @param level: Locking level which is going to be locked
174
    @type level: member of ganeti.locking.LEVELS
175

176
    """
177

    
178
  def CheckPrereq(self):
179
    """Check prerequisites for this LU.
180

181
    This method should check that the prerequisites for the execution
182
    of this LU are fulfilled. It can do internode communication, but
183
    it should be idempotent - no cluster or system changes are
184
    allowed.
185

186
    The method should raise errors.OpPrereqError in case something is
187
    not fulfilled. Its return value is ignored.
188

189
    This method should also update all the parameters of the opcode to
190
    their canonical form if it hasn't been done by ExpandNames before.
191

192
    """
193
    raise NotImplementedError
194

    
195
  def Exec(self, feedback_fn):
196
    """Execute the LU.
197

198
    This method should implement the actual work. It should raise
199
    errors.OpExecError for failures that are somewhat dealt with in
200
    code, or expected.
201

202
    """
203
    raise NotImplementedError
204

    
205
  def BuildHooksEnv(self):
206
    """Build hooks environment for this LU.
207

208
    This method should return a three-node tuple consisting of: a dict
209
    containing the environment that will be used for running the
210
    specific hook for this LU, a list of node names on which the hook
211
    should run before the execution, and a list of node names on which
212
    the hook should run after the execution.
213

214
    The keys of the dict must not have 'GANETI_' prefixed as this will
215
    be handled in the hooks runner. Also note additional keys will be
216
    added by the hooks runner. If the LU doesn't define any
217
    environment, an empty dict (and not None) should be returned.
218

219
    No nodes should be returned as an empty list (and not None).
220

221
    Note that if the HPATH for a LU class is None, this function will
222
    not be called.
223

224
    """
225
    raise NotImplementedError
226

    
227
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
228
    """Notify the LU about the results of its hooks.
229

230
    This method is called every time a hooks phase is executed, and notifies
231
    the Logical Unit about the hooks' result. The LU can then use it to alter
232
    its result based on the hooks.  By default the method does nothing and the
233
    previous result is passed back unchanged but any LU can define it if it
234
    wants to use the local cluster hook-scripts somehow.
235

236
    Args:
237
      phase: the hooks phase that has just been run
238
      hooks_results: the results of the multi-node hooks rpc call
239
      feedback_fn: function to send feedback back to the caller
240
      lu_result: the previous result this LU had, or None in the PRE phase.
241

242
    """
243
    return lu_result
244

    
245
  def _ExpandAndLockInstance(self):
246
    """Helper function to expand and lock an instance.
247

248
    Many LUs that work on an instance take its name in self.op.instance_name
249
    and need to expand it and then declare the expanded name for locking. This
250
    function does it, and then updates self.op.instance_name to the expanded
251
    name. It also initializes needed_locks as a dict, if this hasn't been done
252
    before.
253

254
    """
255
    if self.needed_locks is None:
256
      self.needed_locks = {}
257
    else:
258
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
259
        "_ExpandAndLockInstance called with instance-level locks set"
260
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
261
    if expanded_name is None:
262
      raise errors.OpPrereqError("Instance '%s' not known" %
263
                                  self.op.instance_name)
264
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
265
    self.op.instance_name = expanded_name
266

    
267
  def _LockInstancesNodes(self, primary_only=False):
268
    """Helper function to declare instances' nodes for locking.
269

270
    This function should be called after locking one or more instances to lock
271
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
272
    with all primary or secondary nodes for instances already locked and
273
    present in self.needed_locks[locking.LEVEL_INSTANCE].
274

275
    It should be called from DeclareLocks, and for safety only works if
276
    self.recalculate_locks[locking.LEVEL_NODE] is set.
277

278
    In the future it may grow parameters to just lock some instance's nodes, or
279
    to just lock primaries or secondary nodes, if needed.
280

281
    If should be called in DeclareLocks in a way similar to:
282

283
    if level == locking.LEVEL_NODE:
284
      self._LockInstancesNodes()
285

286
    @type primary_only: boolean
287
    @param primary_only: only lock primary nodes of locked instances
288

289
    """
290
    assert locking.LEVEL_NODE in self.recalculate_locks, \
291
      "_LockInstancesNodes helper function called with no nodes to recalculate"
292

    
293
    # TODO: check if we're really been called with the instance locks held
294

    
295
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
296
    # future we might want to have different behaviors depending on the value
297
    # of self.recalculate_locks[locking.LEVEL_NODE]
298
    wanted_nodes = []
299
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
300
      instance = self.context.cfg.GetInstanceInfo(instance_name)
301
      wanted_nodes.append(instance.primary_node)
302
      if not primary_only:
303
        wanted_nodes.extend(instance.secondary_nodes)
304

    
305
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
306
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
307
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
308
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
309

    
310
    del self.recalculate_locks[locking.LEVEL_NODE]
311

    
312

    
313
class NoHooksLU(LogicalUnit):
314
  """Simple LU which runs no hooks.
315

316
  This LU is intended as a parent for other LogicalUnits which will
317
  run no hooks, in order to reduce duplicate code.
318

319
  """
320
  HPATH = None
321
  HTYPE = None
322

    
323

    
324
def _GetWantedNodes(lu, nodes):
325
  """Returns list of checked and expanded node names.
326

327
  Args:
328
    nodes: List of nodes (strings) or None for all
329

330
  """
331
  if not isinstance(nodes, list):
332
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
333

    
334
  if not nodes:
335
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
336
      " non-empty list of nodes whose name is to be expanded.")
337

    
338
  wanted = []
339
  for name in nodes:
340
    node = lu.cfg.ExpandNodeName(name)
341
    if node is None:
342
      raise errors.OpPrereqError("No such node name '%s'" % name)
343
    wanted.append(node)
344

    
345
  return utils.NiceSort(wanted)
346

    
347

    
348
def _GetWantedInstances(lu, instances):
349
  """Returns list of checked and expanded instance names.
350

351
  Args:
352
    instances: List of instances (strings) or None for all
353

354
  """
355
  if not isinstance(instances, list):
356
    raise errors.OpPrereqError("Invalid argument type 'instances'")
357

    
358
  if instances:
359
    wanted = []
360

    
361
    for name in instances:
362
      instance = lu.cfg.ExpandInstanceName(name)
363
      if instance is None:
364
        raise errors.OpPrereqError("No such instance name '%s'" % name)
365
      wanted.append(instance)
366

    
367
  else:
368
    wanted = lu.cfg.GetInstanceList()
369
  return utils.NiceSort(wanted)
370

    
371

    
372
def _CheckOutputFields(static, dynamic, selected):
373
  """Checks whether all selected fields are valid.
374

375
  Args:
376
    static: Static fields
377
    dynamic: Dynamic fields
378

379
  """
380
  static_fields = frozenset(static)
381
  dynamic_fields = frozenset(dynamic)
382

    
383
  all_fields = static_fields | dynamic_fields
384

    
385
  if not all_fields.issuperset(selected):
386
    raise errors.OpPrereqError("Unknown output fields selected: %s"
387
                               % ",".join(frozenset(selected).
388
                                          difference(all_fields)))
389

    
390

    
391
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
392
                          memory, vcpus, nics):
393
  """Builds instance related env variables for hooks from single variables.
394

395
  Args:
396
    secondary_nodes: List of secondary nodes as strings
397
  """
398
  env = {
399
    "OP_TARGET": name,
400
    "INSTANCE_NAME": name,
401
    "INSTANCE_PRIMARY": primary_node,
402
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
403
    "INSTANCE_OS_TYPE": os_type,
404
    "INSTANCE_STATUS": status,
405
    "INSTANCE_MEMORY": memory,
406
    "INSTANCE_VCPUS": vcpus,
407
  }
408

    
409
  if nics:
410
    nic_count = len(nics)
411
    for idx, (ip, bridge, mac) in enumerate(nics):
412
      if ip is None:
413
        ip = ""
414
      env["INSTANCE_NIC%d_IP" % idx] = ip
415
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
416
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
417
  else:
418
    nic_count = 0
419

    
420
  env["INSTANCE_NIC_COUNT"] = nic_count
421

    
422
  return env
423

    
424

    
425
def _BuildInstanceHookEnvByObject(instance, override=None):
426
  """Builds instance related env variables for hooks from an object.
427

428
  Args:
429
    instance: objects.Instance object of instance
430
    override: dict of values to override
431
  """
432
  args = {
433
    'name': instance.name,
434
    'primary_node': instance.primary_node,
435
    'secondary_nodes': instance.secondary_nodes,
436
    'os_type': instance.os,
437
    'status': instance.os,
438
    'memory': instance.memory,
439
    'vcpus': instance.vcpus,
440
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441
  }
442
  if override:
443
    args.update(override)
444
  return _BuildInstanceHookEnv(**args)
445

    
446

    
447
def _CheckInstanceBridgesExist(instance):
448
  """Check that the brigdes needed by an instance exist.
449

450
  """
451
  # check bridges existance
452
  brlist = [nic.bridge for nic in instance.nics]
453
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
454
    raise errors.OpPrereqError("one or more target bridges %s does not"
455
                               " exist on destination node '%s'" %
456
                               (brlist, instance.primary_node))
457

    
458

    
459
class LUDestroyCluster(NoHooksLU):
460
  """Logical unit for destroying the cluster.
461

462
  """
463
  _OP_REQP = []
464

    
465
  def CheckPrereq(self):
466
    """Check prerequisites.
467

468
    This checks whether the cluster is empty.
469

470
    Any errors are signalled by raising errors.OpPrereqError.
471

472
    """
473
    master = self.sstore.GetMasterNode()
474

    
475
    nodelist = self.cfg.GetNodeList()
476
    if len(nodelist) != 1 or nodelist[0] != master:
477
      raise errors.OpPrereqError("There are still %d node(s) in"
478
                                 " this cluster." % (len(nodelist) - 1))
479
    instancelist = self.cfg.GetInstanceList()
480
    if instancelist:
481
      raise errors.OpPrereqError("There are still %d instance(s) in"
482
                                 " this cluster." % len(instancelist))
483

    
484
  def Exec(self, feedback_fn):
485
    """Destroys the cluster.
486

487
    """
488
    master = self.sstore.GetMasterNode()
489
    if not rpc.call_node_stop_master(master, False):
490
      raise errors.OpExecError("Could not disable the master role")
491
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492
    utils.CreateBackup(priv_key)
493
    utils.CreateBackup(pub_key)
494
    return master
495

    
496

    
497
class LUVerifyCluster(LogicalUnit):
498
  """Verifies the cluster status.
499

500
  """
501
  HPATH = "cluster-verify"
502
  HTYPE = constants.HTYPE_CLUSTER
503
  _OP_REQP = ["skip_checks"]
504
  REQ_BGL = False
505

    
506
  def ExpandNames(self):
507
    self.needed_locks = {
508
      locking.LEVEL_NODE: locking.ALL_SET,
509
      locking.LEVEL_INSTANCE: locking.ALL_SET,
510
    }
511
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512

    
513
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514
                  remote_version, feedback_fn):
515
    """Run multiple tests against a node.
516

517
    Test list:
518
      - compares ganeti version
519
      - checks vg existance and size > 20G
520
      - checks config file checksum
521
      - checks ssh to other nodes
522

523
    Args:
524
      node: name of the node to check
525
      file_list: required list of files
526
      local_cksum: dictionary of local files and their checksums
527

528
    """
529
    # compares ganeti version
530
    local_version = constants.PROTOCOL_VERSION
531
    if not remote_version:
532
      feedback_fn("  - ERROR: connection to %s failed" % (node))
533
      return True
534

    
535
    if local_version != remote_version:
536
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537
                      (local_version, node, remote_version))
538
      return True
539

    
540
    # checks vg existance and size > 20G
541

    
542
    bad = False
543
    if not vglist:
544
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545
                      (node,))
546
      bad = True
547
    else:
548
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549
                                            constants.MIN_VG_SIZE)
550
      if vgstatus:
551
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552
        bad = True
553

    
554
    # checks config file checksum
555
    # checks ssh to any
556

    
557
    if 'filelist' not in node_result:
558
      bad = True
559
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
560
    else:
561
      remote_cksum = node_result['filelist']
562
      for file_name in file_list:
563
        if file_name not in remote_cksum:
564
          bad = True
565
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
566
        elif remote_cksum[file_name] != local_cksum[file_name]:
567
          bad = True
568
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
569

    
570
    if 'nodelist' not in node_result:
571
      bad = True
572
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
573
    else:
574
      if node_result['nodelist']:
575
        bad = True
576
        for node in node_result['nodelist']:
577
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
578
                          (node, node_result['nodelist'][node]))
579
    if 'node-net-test' not in node_result:
580
      bad = True
581
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
582
    else:
583
      if node_result['node-net-test']:
584
        bad = True
585
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
586
        for node in nlist:
587
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
588
                          (node, node_result['node-net-test'][node]))
589

    
590
    hyp_result = node_result.get('hypervisor', None)
591
    if hyp_result is not None:
592
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
593
    return bad
594

    
595
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
596
                      node_instance, feedback_fn):
597
    """Verify an instance.
598

599
    This function checks to see if the required block devices are
600
    available on the instance's node.
601

602
    """
603
    bad = False
604

    
605
    node_current = instanceconfig.primary_node
606

    
607
    node_vol_should = {}
608
    instanceconfig.MapLVsByNode(node_vol_should)
609

    
610
    for node in node_vol_should:
611
      for volume in node_vol_should[node]:
612
        if node not in node_vol_is or volume not in node_vol_is[node]:
613
          feedback_fn("  - ERROR: volume %s missing on node %s" %
614
                          (volume, node))
615
          bad = True
616

    
617
    if not instanceconfig.status == 'down':
618
      if (node_current not in node_instance or
619
          not instance in node_instance[node_current]):
620
        feedback_fn("  - ERROR: instance %s not running on node %s" %
621
                        (instance, node_current))
622
        bad = True
623

    
624
    for node in node_instance:
625
      if (not node == node_current):
626
        if instance in node_instance[node]:
627
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
628
                          (instance, node))
629
          bad = True
630

    
631
    return bad
632

    
633
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
634
    """Verify if there are any unknown volumes in the cluster.
635

636
    The .os, .swap and backup volumes are ignored. All other volumes are
637
    reported as unknown.
638

639
    """
640
    bad = False
641

    
642
    for node in node_vol_is:
643
      for volume in node_vol_is[node]:
644
        if node not in node_vol_should or volume not in node_vol_should[node]:
645
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
646
                      (volume, node))
647
          bad = True
648
    return bad
649

    
650
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
651
    """Verify the list of running instances.
652

653
    This checks what instances are running but unknown to the cluster.
654

655
    """
656
    bad = False
657
    for node in node_instance:
658
      for runninginstance in node_instance[node]:
659
        if runninginstance not in instancelist:
660
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
661
                          (runninginstance, node))
662
          bad = True
663
    return bad
664

    
665
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
666
    """Verify N+1 Memory Resilience.
667

668
    Check that if one single node dies we can still start all the instances it
669
    was primary for.
670

671
    """
672
    bad = False
673

    
674
    for node, nodeinfo in node_info.iteritems():
675
      # This code checks that every node which is now listed as secondary has
676
      # enough memory to host all instances it is supposed to should a single
677
      # other node in the cluster fail.
678
      # FIXME: not ready for failover to an arbitrary node
679
      # FIXME: does not support file-backed instances
680
      # WARNING: we currently take into account down instances as well as up
681
      # ones, considering that even if they're down someone might want to start
682
      # them even in the event of a node failure.
683
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
684
        needed_mem = 0
685
        for instance in instances:
686
          needed_mem += instance_cfg[instance].memory
687
        if nodeinfo['mfree'] < needed_mem:
688
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
689
                      " failovers should node %s fail" % (node, prinode))
690
          bad = True
691
    return bad
692

    
693
  def CheckPrereq(self):
694
    """Check prerequisites.
695

696
    Transform the list of checks we're going to skip into a set and check that
697
    all its members are valid.
698

699
    """
700
    self.skip_set = frozenset(self.op.skip_checks)
701
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
702
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
703

    
704
  def BuildHooksEnv(self):
705
    """Build hooks env.
706

707
    Cluster-Verify hooks just rone in the post phase and their failure makes
708
    the output be logged in the verify output and the verification to fail.
709

710
    """
711
    all_nodes = self.cfg.GetNodeList()
712
    # TODO: populate the environment with useful information for verify hooks
713
    env = {}
714
    return env, [], all_nodes
715

    
716
  def Exec(self, feedback_fn):
717
    """Verify integrity of cluster, performing various test on nodes.
718

719
    """
720
    bad = False
721
    feedback_fn("* Verifying global settings")
722
    for msg in self.cfg.VerifyConfig():
723
      feedback_fn("  - ERROR: %s" % msg)
724

    
725
    vg_name = self.cfg.GetVGName()
726
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
727
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
728
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
729
    i_non_redundant = [] # Non redundant instances
730
    node_volume = {}
731
    node_instance = {}
732
    node_info = {}
733
    instance_cfg = {}
734

    
735
    # FIXME: verify OS list
736
    # do local checksums
737
    file_names = list(self.sstore.GetFileList())
738
    file_names.append(constants.SSL_CERT_FILE)
739
    file_names.append(constants.CLUSTER_CONF_FILE)
740
    local_checksums = utils.FingerprintFiles(file_names)
741

    
742
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
743
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
744
    all_instanceinfo = rpc.call_instance_list(nodelist)
745
    all_vglist = rpc.call_vg_list(nodelist)
746
    node_verify_param = {
747
      'filelist': file_names,
748
      'nodelist': nodelist,
749
      'hypervisor': None,
750
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
751
                        for node in nodeinfo]
752
      }
753
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
754
    all_rversion = rpc.call_version(nodelist)
755
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
756

    
757
    for node in nodelist:
758
      feedback_fn("* Verifying node %s" % node)
759
      result = self._VerifyNode(node, file_names, local_checksums,
760
                                all_vglist[node], all_nvinfo[node],
761
                                all_rversion[node], feedback_fn)
762
      bad = bad or result
763

    
764
      # node_volume
765
      volumeinfo = all_volumeinfo[node]
766

    
767
      if isinstance(volumeinfo, basestring):
768
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
769
                    (node, volumeinfo[-400:].encode('string_escape')))
770
        bad = True
771
        node_volume[node] = {}
772
      elif not isinstance(volumeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776
      else:
777
        node_volume[node] = volumeinfo
778

    
779
      # node_instance
780
      nodeinstance = all_instanceinfo[node]
781
      if type(nodeinstance) != list:
782
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
783
        bad = True
784
        continue
785

    
786
      node_instance[node] = nodeinstance
787

    
788
      # node_info
789
      nodeinfo = all_ninfo[node]
790
      if not isinstance(nodeinfo, dict):
791
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
792
        bad = True
793
        continue
794

    
795
      try:
796
        node_info[node] = {
797
          "mfree": int(nodeinfo['memory_free']),
798
          "dfree": int(nodeinfo['vg_free']),
799
          "pinst": [],
800
          "sinst": [],
801
          # dictionary holding all instances this node is secondary for,
802
          # grouped by their primary node. Each key is a cluster node, and each
803
          # value is a list of instances which have the key as primary and the
804
          # current node as secondary.  this is handy to calculate N+1 memory
805
          # availability if you can only failover from a primary to its
806
          # secondary.
807
          "sinst-by-pnode": {},
808
        }
809
      except ValueError:
810
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
811
        bad = True
812
        continue
813

    
814
    node_vol_should = {}
815

    
816
    for instance in instancelist:
817
      feedback_fn("* Verifying instance %s" % instance)
818
      inst_config = self.cfg.GetInstanceInfo(instance)
819
      result =  self._VerifyInstance(instance, inst_config, node_volume,
820
                                     node_instance, feedback_fn)
821
      bad = bad or result
822

    
823
      inst_config.MapLVsByNode(node_vol_should)
824

    
825
      instance_cfg[instance] = inst_config
826

    
827
      pnode = inst_config.primary_node
828
      if pnode in node_info:
829
        node_info[pnode]['pinst'].append(instance)
830
      else:
831
        feedback_fn("  - ERROR: instance %s, connection to primary node"
832
                    " %s failed" % (instance, pnode))
833
        bad = True
834

    
835
      # If the instance is non-redundant we cannot survive losing its primary
836
      # node, so we are not N+1 compliant. On the other hand we have no disk
837
      # templates with more than one secondary so that situation is not well
838
      # supported either.
839
      # FIXME: does not support file-backed instances
840
      if len(inst_config.secondary_nodes) == 0:
841
        i_non_redundant.append(instance)
842
      elif len(inst_config.secondary_nodes) > 1:
843
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
844
                    % instance)
845

    
846
      for snode in inst_config.secondary_nodes:
847
        if snode in node_info:
848
          node_info[snode]['sinst'].append(instance)
849
          if pnode not in node_info[snode]['sinst-by-pnode']:
850
            node_info[snode]['sinst-by-pnode'][pnode] = []
851
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
852
        else:
853
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
854
                      " %s failed" % (instance, snode))
855

    
856
    feedback_fn("* Verifying orphan volumes")
857
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
858
                                       feedback_fn)
859
    bad = bad or result
860

    
861
    feedback_fn("* Verifying remaining instances")
862
    result = self._VerifyOrphanInstances(instancelist, node_instance,
863
                                         feedback_fn)
864
    bad = bad or result
865

    
866
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
867
      feedback_fn("* Verifying N+1 Memory redundancy")
868
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
869
      bad = bad or result
870

    
871
    feedback_fn("* Other Notes")
872
    if i_non_redundant:
873
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
874
                  % len(i_non_redundant))
875

    
876
    return not bad
877

    
878
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
879
    """Analize the post-hooks' result, handle it, and send some
880
    nicely-formatted feedback back to the user.
881

882
    Args:
883
      phase: the hooks phase that has just been run
884
      hooks_results: the results of the multi-node hooks rpc call
885
      feedback_fn: function to send feedback back to the caller
886
      lu_result: previous Exec result
887

888
    """
889
    # We only really run POST phase hooks, and are only interested in
890
    # their results
891
    if phase == constants.HOOKS_PHASE_POST:
892
      # Used to change hooks' output to proper indentation
893
      indent_re = re.compile('^', re.M)
894
      feedback_fn("* Hooks Results")
895
      if not hooks_results:
896
        feedback_fn("  - ERROR: general communication failure")
897
        lu_result = 1
898
      else:
899
        for node_name in hooks_results:
900
          show_node_header = True
901
          res = hooks_results[node_name]
902
          if res is False or not isinstance(res, list):
903
            feedback_fn("    Communication failure")
904
            lu_result = 1
905
            continue
906
          for script, hkr, output in res:
907
            if hkr == constants.HKR_FAIL:
908
              # The node header is only shown once, if there are
909
              # failing hooks on that node
910
              if show_node_header:
911
                feedback_fn("  Node %s:" % node_name)
912
                show_node_header = False
913
              feedback_fn("    ERROR: Script %s failed, output:" % script)
914
              output = indent_re.sub('      ', output)
915
              feedback_fn("%s" % output)
916
              lu_result = 1
917

    
918
      return lu_result
919

    
920

    
921
class LUVerifyDisks(NoHooksLU):
922
  """Verifies the cluster disks status.
923

924
  """
925
  _OP_REQP = []
926
  REQ_BGL = False
927

    
928
  def ExpandNames(self):
929
    self.needed_locks = {
930
      locking.LEVEL_NODE: locking.ALL_SET,
931
      locking.LEVEL_INSTANCE: locking.ALL_SET,
932
    }
933
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
934

    
935
  def CheckPrereq(self):
936
    """Check prerequisites.
937

938
    This has no prerequisites.
939

940
    """
941
    pass
942

    
943
  def Exec(self, feedback_fn):
944
    """Verify integrity of cluster disks.
945

946
    """
947
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
948

    
949
    vg_name = self.cfg.GetVGName()
950
    nodes = utils.NiceSort(self.cfg.GetNodeList())
951
    instances = [self.cfg.GetInstanceInfo(name)
952
                 for name in self.cfg.GetInstanceList()]
953

    
954
    nv_dict = {}
955
    for inst in instances:
956
      inst_lvs = {}
957
      if (inst.status != "up" or
958
          inst.disk_template not in constants.DTS_NET_MIRROR):
959
        continue
960
      inst.MapLVsByNode(inst_lvs)
961
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
962
      for node, vol_list in inst_lvs.iteritems():
963
        for vol in vol_list:
964
          nv_dict[(node, vol)] = inst
965

    
966
    if not nv_dict:
967
      return result
968

    
969
    node_lvs = rpc.call_volume_list(nodes, vg_name)
970

    
971
    to_act = set()
972
    for node in nodes:
973
      # node_volume
974
      lvs = node_lvs[node]
975

    
976
      if isinstance(lvs, basestring):
977
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
978
        res_nlvm[node] = lvs
979
      elif not isinstance(lvs, dict):
980
        logger.Info("connection to node %s failed or invalid data returned" %
981
                    (node,))
982
        res_nodes.append(node)
983
        continue
984

    
985
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
986
        inst = nv_dict.pop((node, lv_name), None)
987
        if (not lv_online and inst is not None
988
            and inst.name not in res_instances):
989
          res_instances.append(inst.name)
990

    
991
    # any leftover items in nv_dict are missing LVs, let's arrange the
992
    # data better
993
    for key, inst in nv_dict.iteritems():
994
      if inst.name not in res_missing:
995
        res_missing[inst.name] = []
996
      res_missing[inst.name].append(key)
997

    
998
    return result
999

    
1000

    
1001
class LURenameCluster(LogicalUnit):
1002
  """Rename the cluster.
1003

1004
  """
1005
  HPATH = "cluster-rename"
1006
  HTYPE = constants.HTYPE_CLUSTER
1007
  _OP_REQP = ["name"]
1008
  REQ_WSSTORE = True
1009

    
1010
  def BuildHooksEnv(self):
1011
    """Build hooks env.
1012

1013
    """
1014
    env = {
1015
      "OP_TARGET": self.sstore.GetClusterName(),
1016
      "NEW_NAME": self.op.name,
1017
      }
1018
    mn = self.sstore.GetMasterNode()
1019
    return env, [mn], [mn]
1020

    
1021
  def CheckPrereq(self):
1022
    """Verify that the passed name is a valid one.
1023

1024
    """
1025
    hostname = utils.HostInfo(self.op.name)
1026

    
1027
    new_name = hostname.name
1028
    self.ip = new_ip = hostname.ip
1029
    old_name = self.sstore.GetClusterName()
1030
    old_ip = self.sstore.GetMasterIP()
1031
    if new_name == old_name and new_ip == old_ip:
1032
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1033
                                 " cluster has changed")
1034
    if new_ip != old_ip:
1035
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1036
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1037
                                   " reachable on the network. Aborting." %
1038
                                   new_ip)
1039

    
1040
    self.op.name = new_name
1041

    
1042
  def Exec(self, feedback_fn):
1043
    """Rename the cluster.
1044

1045
    """
1046
    clustername = self.op.name
1047
    ip = self.ip
1048
    ss = self.sstore
1049

    
1050
    # shutdown the master IP
1051
    master = ss.GetMasterNode()
1052
    if not rpc.call_node_stop_master(master, False):
1053
      raise errors.OpExecError("Could not disable the master role")
1054

    
1055
    try:
1056
      # modify the sstore
1057
      ss.SetKey(ss.SS_MASTER_IP, ip)
1058
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1059

    
1060
      # Distribute updated ss config to all nodes
1061
      myself = self.cfg.GetNodeInfo(master)
1062
      dist_nodes = self.cfg.GetNodeList()
1063
      if myself.name in dist_nodes:
1064
        dist_nodes.remove(myself.name)
1065

    
1066
      logger.Debug("Copying updated ssconf data to all nodes")
1067
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1068
        fname = ss.KeyToFilename(keyname)
1069
        result = rpc.call_upload_file(dist_nodes, fname)
1070
        for to_node in dist_nodes:
1071
          if not result[to_node]:
1072
            logger.Error("copy of file %s to node %s failed" %
1073
                         (fname, to_node))
1074
    finally:
1075
      if not rpc.call_node_start_master(master, False):
1076
        logger.Error("Could not re-enable the master role on the master,"
1077
                     " please restart manually.")
1078

    
1079

    
1080
def _RecursiveCheckIfLVMBased(disk):
1081
  """Check if the given disk or its children are lvm-based.
1082

1083
  Args:
1084
    disk: ganeti.objects.Disk object
1085

1086
  Returns:
1087
    boolean indicating whether a LD_LV dev_type was found or not
1088

1089
  """
1090
  if disk.children:
1091
    for chdisk in disk.children:
1092
      if _RecursiveCheckIfLVMBased(chdisk):
1093
        return True
1094
  return disk.dev_type == constants.LD_LV
1095

    
1096

    
1097
class LUSetClusterParams(LogicalUnit):
1098
  """Change the parameters of the cluster.
1099

1100
  """
1101
  HPATH = "cluster-modify"
1102
  HTYPE = constants.HTYPE_CLUSTER
1103
  _OP_REQP = []
1104
  REQ_BGL = False
1105

    
1106
  def ExpandNames(self):
1107
    # FIXME: in the future maybe other cluster params won't require checking on
1108
    # all nodes to be modified.
1109
    self.needed_locks = {
1110
      locking.LEVEL_NODE: locking.ALL_SET,
1111
    }
1112
    self.share_locks[locking.LEVEL_NODE] = 1
1113

    
1114
  def BuildHooksEnv(self):
1115
    """Build hooks env.
1116

1117
    """
1118
    env = {
1119
      "OP_TARGET": self.sstore.GetClusterName(),
1120
      "NEW_VG_NAME": self.op.vg_name,
1121
      }
1122
    mn = self.sstore.GetMasterNode()
1123
    return env, [mn], [mn]
1124

    
1125
  def CheckPrereq(self):
1126
    """Check prerequisites.
1127

1128
    This checks whether the given params don't conflict and
1129
    if the given volume group is valid.
1130

1131
    """
1132
    # FIXME: This only works because there is only one parameter that can be
1133
    # changed or removed.
1134
    if not self.op.vg_name:
1135
      instances = self.cfg.GetAllInstancesInfo().values()
1136
      for inst in instances:
1137
        for disk in inst.disks:
1138
          if _RecursiveCheckIfLVMBased(disk):
1139
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1140
                                       " lvm-based instances exist")
1141

    
1142
    # if vg_name not None, checks given volume group on all nodes
1143
    if self.op.vg_name:
1144
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1145
      vglist = rpc.call_vg_list(node_list)
1146
      for node in node_list:
1147
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1148
                                              constants.MIN_VG_SIZE)
1149
        if vgstatus:
1150
          raise errors.OpPrereqError("Error on node '%s': %s" %
1151
                                     (node, vgstatus))
1152

    
1153
  def Exec(self, feedback_fn):
1154
    """Change the parameters of the cluster.
1155

1156
    """
1157
    if self.op.vg_name != self.cfg.GetVGName():
1158
      self.cfg.SetVGName(self.op.vg_name)
1159
    else:
1160
      feedback_fn("Cluster LVM configuration already in desired"
1161
                  " state, not changing")
1162

    
1163

    
1164
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1165
  """Sleep and poll for an instance's disk to sync.
1166

1167
  """
1168
  if not instance.disks:
1169
    return True
1170

    
1171
  if not oneshot:
1172
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1173

    
1174
  node = instance.primary_node
1175

    
1176
  for dev in instance.disks:
1177
    cfgw.SetDiskID(dev, node)
1178

    
1179
  retries = 0
1180
  while True:
1181
    max_time = 0
1182
    done = True
1183
    cumul_degraded = False
1184
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1185
    if not rstats:
1186
      proc.LogWarning("Can't get any data from node %s" % node)
1187
      retries += 1
1188
      if retries >= 10:
1189
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1190
                                 " aborting." % node)
1191
      time.sleep(6)
1192
      continue
1193
    retries = 0
1194
    for i in range(len(rstats)):
1195
      mstat = rstats[i]
1196
      if mstat is None:
1197
        proc.LogWarning("Can't compute data for node %s/%s" %
1198
                        (node, instance.disks[i].iv_name))
1199
        continue
1200
      # we ignore the ldisk parameter
1201
      perc_done, est_time, is_degraded, _ = mstat
1202
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1203
      if perc_done is not None:
1204
        done = False
1205
        if est_time is not None:
1206
          rem_time = "%d estimated seconds remaining" % est_time
1207
          max_time = est_time
1208
        else:
1209
          rem_time = "no time estimate"
1210
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1211
                     (instance.disks[i].iv_name, perc_done, rem_time))
1212
    if done or oneshot:
1213
      break
1214

    
1215
    time.sleep(min(60, max_time))
1216

    
1217
  if done:
1218
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1219
  return not cumul_degraded
1220

    
1221

    
1222
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1223
  """Check that mirrors are not degraded.
1224

1225
  The ldisk parameter, if True, will change the test from the
1226
  is_degraded attribute (which represents overall non-ok status for
1227
  the device(s)) to the ldisk (representing the local storage status).
1228

1229
  """
1230
  cfgw.SetDiskID(dev, node)
1231
  if ldisk:
1232
    idx = 6
1233
  else:
1234
    idx = 5
1235

    
1236
  result = True
1237
  if on_primary or dev.AssembleOnSecondary():
1238
    rstats = rpc.call_blockdev_find(node, dev)
1239
    if not rstats:
1240
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1241
      result = False
1242
    else:
1243
      result = result and (not rstats[idx])
1244
  if dev.children:
1245
    for child in dev.children:
1246
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1247

    
1248
  return result
1249

    
1250

    
1251
class LUDiagnoseOS(NoHooksLU):
1252
  """Logical unit for OS diagnose/query.
1253

1254
  """
1255
  _OP_REQP = ["output_fields", "names"]
1256
  REQ_BGL = False
1257

    
1258
  def ExpandNames(self):
1259
    if self.op.names:
1260
      raise errors.OpPrereqError("Selective OS query not supported")
1261

    
1262
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1263
    _CheckOutputFields(static=[],
1264
                       dynamic=self.dynamic_fields,
1265
                       selected=self.op.output_fields)
1266

    
1267
    # Lock all nodes, in shared mode
1268
    self.needed_locks = {}
1269
    self.share_locks[locking.LEVEL_NODE] = 1
1270
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1271

    
1272
  def CheckPrereq(self):
1273
    """Check prerequisites.
1274

1275
    """
1276

    
1277
  @staticmethod
1278
  def _DiagnoseByOS(node_list, rlist):
1279
    """Remaps a per-node return list into an a per-os per-node dictionary
1280

1281
      Args:
1282
        node_list: a list with the names of all nodes
1283
        rlist: a map with node names as keys and OS objects as values
1284

1285
      Returns:
1286
        map: a map with osnames as keys and as value another map, with
1287
             nodes as
1288
             keys and list of OS objects as values
1289
             e.g. {"debian-etch": {"node1": [<object>,...],
1290
                                   "node2": [<object>,]}
1291
                  }
1292

1293
    """
1294
    all_os = {}
1295
    for node_name, nr in rlist.iteritems():
1296
      if not nr:
1297
        continue
1298
      for os_obj in nr:
1299
        if os_obj.name not in all_os:
1300
          # build a list of nodes for this os containing empty lists
1301
          # for each node in node_list
1302
          all_os[os_obj.name] = {}
1303
          for nname in node_list:
1304
            all_os[os_obj.name][nname] = []
1305
        all_os[os_obj.name][node_name].append(os_obj)
1306
    return all_os
1307

    
1308
  def Exec(self, feedback_fn):
1309
    """Compute the list of OSes.
1310

1311
    """
1312
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1313
    node_data = rpc.call_os_diagnose(node_list)
1314
    if node_data == False:
1315
      raise errors.OpExecError("Can't gather the list of OSes")
1316
    pol = self._DiagnoseByOS(node_list, node_data)
1317
    output = []
1318
    for os_name, os_data in pol.iteritems():
1319
      row = []
1320
      for field in self.op.output_fields:
1321
        if field == "name":
1322
          val = os_name
1323
        elif field == "valid":
1324
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1325
        elif field == "node_status":
1326
          val = {}
1327
          for node_name, nos_list in os_data.iteritems():
1328
            val[node_name] = [(v.status, v.path) for v in nos_list]
1329
        else:
1330
          raise errors.ParameterError(field)
1331
        row.append(val)
1332
      output.append(row)
1333

    
1334
    return output
1335

    
1336

    
1337
class LURemoveNode(LogicalUnit):
1338
  """Logical unit for removing a node.
1339

1340
  """
1341
  HPATH = "node-remove"
1342
  HTYPE = constants.HTYPE_NODE
1343
  _OP_REQP = ["node_name"]
1344

    
1345
  def BuildHooksEnv(self):
1346
    """Build hooks env.
1347

1348
    This doesn't run on the target node in the pre phase as a failed
1349
    node would then be impossible to remove.
1350

1351
    """
1352
    env = {
1353
      "OP_TARGET": self.op.node_name,
1354
      "NODE_NAME": self.op.node_name,
1355
      }
1356
    all_nodes = self.cfg.GetNodeList()
1357
    all_nodes.remove(self.op.node_name)
1358
    return env, all_nodes, all_nodes
1359

    
1360
  def CheckPrereq(self):
1361
    """Check prerequisites.
1362

1363
    This checks:
1364
     - the node exists in the configuration
1365
     - it does not have primary or secondary instances
1366
     - it's not the master
1367

1368
    Any errors are signalled by raising errors.OpPrereqError.
1369

1370
    """
1371
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1372
    if node is None:
1373
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1374

    
1375
    instance_list = self.cfg.GetInstanceList()
1376

    
1377
    masternode = self.sstore.GetMasterNode()
1378
    if node.name == masternode:
1379
      raise errors.OpPrereqError("Node is the master node,"
1380
                                 " you need to failover first.")
1381

    
1382
    for instance_name in instance_list:
1383
      instance = self.cfg.GetInstanceInfo(instance_name)
1384
      if node.name == instance.primary_node:
1385
        raise errors.OpPrereqError("Instance %s still running on the node,"
1386
                                   " please remove first." % instance_name)
1387
      if node.name in instance.secondary_nodes:
1388
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1389
                                   " please remove first." % instance_name)
1390
    self.op.node_name = node.name
1391
    self.node = node
1392

    
1393
  def Exec(self, feedback_fn):
1394
    """Removes the node from the cluster.
1395

1396
    """
1397
    node = self.node
1398
    logger.Info("stopping the node daemon and removing configs from node %s" %
1399
                node.name)
1400

    
1401
    self.context.RemoveNode(node.name)
1402

    
1403
    rpc.call_node_leave_cluster(node.name)
1404

    
1405

    
1406
class LUQueryNodes(NoHooksLU):
1407
  """Logical unit for querying nodes.
1408

1409
  """
1410
  _OP_REQP = ["output_fields", "names"]
1411
  REQ_BGL = False
1412

    
1413
  def ExpandNames(self):
1414
    self.dynamic_fields = frozenset([
1415
      "dtotal", "dfree",
1416
      "mtotal", "mnode", "mfree",
1417
      "bootid",
1418
      "ctotal",
1419
      ])
1420

    
1421
    self.static_fields = frozenset([
1422
      "name", "pinst_cnt", "sinst_cnt",
1423
      "pinst_list", "sinst_list",
1424
      "pip", "sip", "tags",
1425
      ])
1426

    
1427
    _CheckOutputFields(static=self.static_fields,
1428
                       dynamic=self.dynamic_fields,
1429
                       selected=self.op.output_fields)
1430

    
1431
    self.needed_locks = {}
1432
    self.share_locks[locking.LEVEL_NODE] = 1
1433

    
1434
    if self.op.names:
1435
      self.wanted = _GetWantedNodes(self, self.op.names)
1436
    else:
1437
      self.wanted = locking.ALL_SET
1438

    
1439
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1440
    if self.do_locking:
1441
      # if we don't request only static fields, we need to lock the nodes
1442
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1443

    
1444

    
1445
  def CheckPrereq(self):
1446
    """Check prerequisites.
1447

1448
    """
1449
    # The validation of the node list is done in the _GetWantedNodes,
1450
    # if non empty, and if empty, there's no validation to do
1451
    pass
1452

    
1453
  def Exec(self, feedback_fn):
1454
    """Computes the list of nodes and their attributes.
1455

1456
    """
1457
    all_info = self.cfg.GetAllNodesInfo()
1458
    if self.do_locking:
1459
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1460
    else:
1461
      nodenames = all_info.keys()
1462
    nodelist = [all_info[name] for name in nodenames]
1463

    
1464
    # begin data gathering
1465

    
1466
    if self.dynamic_fields.intersection(self.op.output_fields):
1467
      live_data = {}
1468
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1469
      for name in nodenames:
1470
        nodeinfo = node_data.get(name, None)
1471
        if nodeinfo:
1472
          live_data[name] = {
1473
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1474
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1475
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1476
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1477
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1478
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1479
            "bootid": nodeinfo['bootid'],
1480
            }
1481
        else:
1482
          live_data[name] = {}
1483
    else:
1484
      live_data = dict.fromkeys(nodenames, {})
1485

    
1486
    node_to_primary = dict([(name, set()) for name in nodenames])
1487
    node_to_secondary = dict([(name, set()) for name in nodenames])
1488

    
1489
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1490
                             "sinst_cnt", "sinst_list"))
1491
    if inst_fields & frozenset(self.op.output_fields):
1492
      instancelist = self.cfg.GetInstanceList()
1493

    
1494
      for instance_name in instancelist:
1495
        inst = self.cfg.GetInstanceInfo(instance_name)
1496
        if inst.primary_node in node_to_primary:
1497
          node_to_primary[inst.primary_node].add(inst.name)
1498
        for secnode in inst.secondary_nodes:
1499
          if secnode in node_to_secondary:
1500
            node_to_secondary[secnode].add(inst.name)
1501

    
1502
    # end data gathering
1503

    
1504
    output = []
1505
    for node in nodelist:
1506
      node_output = []
1507
      for field in self.op.output_fields:
1508
        if field == "name":
1509
          val = node.name
1510
        elif field == "pinst_list":
1511
          val = list(node_to_primary[node.name])
1512
        elif field == "sinst_list":
1513
          val = list(node_to_secondary[node.name])
1514
        elif field == "pinst_cnt":
1515
          val = len(node_to_primary[node.name])
1516
        elif field == "sinst_cnt":
1517
          val = len(node_to_secondary[node.name])
1518
        elif field == "pip":
1519
          val = node.primary_ip
1520
        elif field == "sip":
1521
          val = node.secondary_ip
1522
        elif field == "tags":
1523
          val = list(node.GetTags())
1524
        elif field in self.dynamic_fields:
1525
          val = live_data[node.name].get(field, None)
1526
        else:
1527
          raise errors.ParameterError(field)
1528
        node_output.append(val)
1529
      output.append(node_output)
1530

    
1531
    return output
1532

    
1533

    
1534
class LUQueryNodeVolumes(NoHooksLU):
1535
  """Logical unit for getting volumes on node(s).
1536

1537
  """
1538
  _OP_REQP = ["nodes", "output_fields"]
1539
  REQ_BGL = False
1540

    
1541
  def ExpandNames(self):
1542
    _CheckOutputFields(static=["node"],
1543
                       dynamic=["phys", "vg", "name", "size", "instance"],
1544
                       selected=self.op.output_fields)
1545

    
1546
    self.needed_locks = {}
1547
    self.share_locks[locking.LEVEL_NODE] = 1
1548
    if not self.op.nodes:
1549
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1550
    else:
1551
      self.needed_locks[locking.LEVEL_NODE] = \
1552
        _GetWantedNodes(self, self.op.nodes)
1553

    
1554
  def CheckPrereq(self):
1555
    """Check prerequisites.
1556

1557
    This checks that the fields required are valid output fields.
1558

1559
    """
1560
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1561

    
1562
  def Exec(self, feedback_fn):
1563
    """Computes the list of nodes and their attributes.
1564

1565
    """
1566
    nodenames = self.nodes
1567
    volumes = rpc.call_node_volumes(nodenames)
1568

    
1569
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1570
             in self.cfg.GetInstanceList()]
1571

    
1572
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1573

    
1574
    output = []
1575
    for node in nodenames:
1576
      if node not in volumes or not volumes[node]:
1577
        continue
1578

    
1579
      node_vols = volumes[node][:]
1580
      node_vols.sort(key=lambda vol: vol['dev'])
1581

    
1582
      for vol in node_vols:
1583
        node_output = []
1584
        for field in self.op.output_fields:
1585
          if field == "node":
1586
            val = node
1587
          elif field == "phys":
1588
            val = vol['dev']
1589
          elif field == "vg":
1590
            val = vol['vg']
1591
          elif field == "name":
1592
            val = vol['name']
1593
          elif field == "size":
1594
            val = int(float(vol['size']))
1595
          elif field == "instance":
1596
            for inst in ilist:
1597
              if node not in lv_by_node[inst]:
1598
                continue
1599
              if vol['name'] in lv_by_node[inst][node]:
1600
                val = inst.name
1601
                break
1602
            else:
1603
              val = '-'
1604
          else:
1605
            raise errors.ParameterError(field)
1606
          node_output.append(str(val))
1607

    
1608
        output.append(node_output)
1609

    
1610
    return output
1611

    
1612

    
1613
class LUAddNode(LogicalUnit):
1614
  """Logical unit for adding node to the cluster.
1615

1616
  """
1617
  HPATH = "node-add"
1618
  HTYPE = constants.HTYPE_NODE
1619
  _OP_REQP = ["node_name"]
1620

    
1621
  def BuildHooksEnv(self):
1622
    """Build hooks env.
1623

1624
    This will run on all nodes before, and on all nodes + the new node after.
1625

1626
    """
1627
    env = {
1628
      "OP_TARGET": self.op.node_name,
1629
      "NODE_NAME": self.op.node_name,
1630
      "NODE_PIP": self.op.primary_ip,
1631
      "NODE_SIP": self.op.secondary_ip,
1632
      }
1633
    nodes_0 = self.cfg.GetNodeList()
1634
    nodes_1 = nodes_0 + [self.op.node_name, ]
1635
    return env, nodes_0, nodes_1
1636

    
1637
  def CheckPrereq(self):
1638
    """Check prerequisites.
1639

1640
    This checks:
1641
     - the new node is not already in the config
1642
     - it is resolvable
1643
     - its parameters (single/dual homed) matches the cluster
1644

1645
    Any errors are signalled by raising errors.OpPrereqError.
1646

1647
    """
1648
    node_name = self.op.node_name
1649
    cfg = self.cfg
1650

    
1651
    dns_data = utils.HostInfo(node_name)
1652

    
1653
    node = dns_data.name
1654
    primary_ip = self.op.primary_ip = dns_data.ip
1655
    secondary_ip = getattr(self.op, "secondary_ip", None)
1656
    if secondary_ip is None:
1657
      secondary_ip = primary_ip
1658
    if not utils.IsValidIP(secondary_ip):
1659
      raise errors.OpPrereqError("Invalid secondary IP given")
1660
    self.op.secondary_ip = secondary_ip
1661

    
1662
    node_list = cfg.GetNodeList()
1663
    if not self.op.readd and node in node_list:
1664
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1665
                                 node)
1666
    elif self.op.readd and node not in node_list:
1667
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1668

    
1669
    for existing_node_name in node_list:
1670
      existing_node = cfg.GetNodeInfo(existing_node_name)
1671

    
1672
      if self.op.readd and node == existing_node_name:
1673
        if (existing_node.primary_ip != primary_ip or
1674
            existing_node.secondary_ip != secondary_ip):
1675
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1676
                                     " address configuration as before")
1677
        continue
1678

    
1679
      if (existing_node.primary_ip == primary_ip or
1680
          existing_node.secondary_ip == primary_ip or
1681
          existing_node.primary_ip == secondary_ip or
1682
          existing_node.secondary_ip == secondary_ip):
1683
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1684
                                   " existing node %s" % existing_node.name)
1685

    
1686
    # check that the type of the node (single versus dual homed) is the
1687
    # same as for the master
1688
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1689
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1690
    newbie_singlehomed = secondary_ip == primary_ip
1691
    if master_singlehomed != newbie_singlehomed:
1692
      if master_singlehomed:
1693
        raise errors.OpPrereqError("The master has no private ip but the"
1694
                                   " new node has one")
1695
      else:
1696
        raise errors.OpPrereqError("The master has a private ip but the"
1697
                                   " new node doesn't have one")
1698

    
1699
    # checks reachablity
1700
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1701
      raise errors.OpPrereqError("Node not reachable by ping")
1702

    
1703
    if not newbie_singlehomed:
1704
      # check reachability from my secondary ip to newbie's secondary ip
1705
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1706
                           source=myself.secondary_ip):
1707
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1708
                                   " based ping to noded port")
1709

    
1710
    self.new_node = objects.Node(name=node,
1711
                                 primary_ip=primary_ip,
1712
                                 secondary_ip=secondary_ip)
1713

    
1714
  def Exec(self, feedback_fn):
1715
    """Adds the new node to the cluster.
1716

1717
    """
1718
    new_node = self.new_node
1719
    node = new_node.name
1720

    
1721
    # check connectivity
1722
    result = rpc.call_version([node])[node]
1723
    if result:
1724
      if constants.PROTOCOL_VERSION == result:
1725
        logger.Info("communication to node %s fine, sw version %s match" %
1726
                    (node, result))
1727
      else:
1728
        raise errors.OpExecError("Version mismatch master version %s,"
1729
                                 " node version %s" %
1730
                                 (constants.PROTOCOL_VERSION, result))
1731
    else:
1732
      raise errors.OpExecError("Cannot get version from the new node")
1733

    
1734
    # setup ssh on node
1735
    logger.Info("copy ssh key to node %s" % node)
1736
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1737
    keyarray = []
1738
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1739
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1740
                priv_key, pub_key]
1741

    
1742
    for i in keyfiles:
1743
      f = open(i, 'r')
1744
      try:
1745
        keyarray.append(f.read())
1746
      finally:
1747
        f.close()
1748

    
1749
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1750
                               keyarray[3], keyarray[4], keyarray[5])
1751

    
1752
    if not result:
1753
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1754

    
1755
    # Add node to our /etc/hosts, and add key to known_hosts
1756
    utils.AddHostToEtcHosts(new_node.name)
1757

    
1758
    if new_node.secondary_ip != new_node.primary_ip:
1759
      if not rpc.call_node_tcp_ping(new_node.name,
1760
                                    constants.LOCALHOST_IP_ADDRESS,
1761
                                    new_node.secondary_ip,
1762
                                    constants.DEFAULT_NODED_PORT,
1763
                                    10, False):
1764
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1765
                                 " you gave (%s). Please fix and re-run this"
1766
                                 " command." % new_node.secondary_ip)
1767

    
1768
    node_verify_list = [self.sstore.GetMasterNode()]
1769
    node_verify_param = {
1770
      'nodelist': [node],
1771
      # TODO: do a node-net-test as well?
1772
    }
1773

    
1774
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1775
    for verifier in node_verify_list:
1776
      if not result[verifier]:
1777
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1778
                                 " for remote verification" % verifier)
1779
      if result[verifier]['nodelist']:
1780
        for failed in result[verifier]['nodelist']:
1781
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1782
                      (verifier, result[verifier]['nodelist'][failed]))
1783
        raise errors.OpExecError("ssh/hostname verification failed.")
1784

    
1785
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1786
    # including the node just added
1787
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1788
    dist_nodes = self.cfg.GetNodeList()
1789
    if not self.op.readd:
1790
      dist_nodes.append(node)
1791
    if myself.name in dist_nodes:
1792
      dist_nodes.remove(myself.name)
1793

    
1794
    logger.Debug("Copying hosts and known_hosts to all nodes")
1795
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1796
      result = rpc.call_upload_file(dist_nodes, fname)
1797
      for to_node in dist_nodes:
1798
        if not result[to_node]:
1799
          logger.Error("copy of file %s to node %s failed" %
1800
                       (fname, to_node))
1801

    
1802
    to_copy = self.sstore.GetFileList()
1803
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1804
      to_copy.append(constants.VNC_PASSWORD_FILE)
1805
    for fname in to_copy:
1806
      result = rpc.call_upload_file([node], fname)
1807
      if not result[node]:
1808
        logger.Error("could not copy file %s to node %s" % (fname, node))
1809

    
1810
    if self.op.readd:
1811
      self.context.ReaddNode(new_node)
1812
    else:
1813
      self.context.AddNode(new_node)
1814

    
1815

    
1816
class LUQueryClusterInfo(NoHooksLU):
1817
  """Query cluster configuration.
1818

1819
  """
1820
  _OP_REQP = []
1821
  REQ_MASTER = False
1822
  REQ_BGL = False
1823

    
1824
  def ExpandNames(self):
1825
    self.needed_locks = {}
1826

    
1827
  def CheckPrereq(self):
1828
    """No prerequsites needed for this LU.
1829

1830
    """
1831
    pass
1832

    
1833
  def Exec(self, feedback_fn):
1834
    """Return cluster config.
1835

1836
    """
1837
    result = {
1838
      "name": self.sstore.GetClusterName(),
1839
      "software_version": constants.RELEASE_VERSION,
1840
      "protocol_version": constants.PROTOCOL_VERSION,
1841
      "config_version": constants.CONFIG_VERSION,
1842
      "os_api_version": constants.OS_API_VERSION,
1843
      "export_version": constants.EXPORT_VERSION,
1844
      "master": self.sstore.GetMasterNode(),
1845
      "architecture": (platform.architecture()[0], platform.machine()),
1846
      "hypervisor_type": self.sstore.GetHypervisorType(),
1847
      }
1848

    
1849
    return result
1850

    
1851

    
1852
class LUDumpClusterConfig(NoHooksLU):
1853
  """Return a text-representation of the cluster-config.
1854

1855
  """
1856
  _OP_REQP = []
1857
  REQ_BGL = False
1858

    
1859
  def ExpandNames(self):
1860
    self.needed_locks = {}
1861

    
1862
  def CheckPrereq(self):
1863
    """No prerequisites.
1864

1865
    """
1866
    pass
1867

    
1868
  def Exec(self, feedback_fn):
1869
    """Dump a representation of the cluster config to the standard output.
1870

1871
    """
1872
    return self.cfg.DumpConfig()
1873

    
1874

    
1875
class LUActivateInstanceDisks(NoHooksLU):
1876
  """Bring up an instance's disks.
1877

1878
  """
1879
  _OP_REQP = ["instance_name"]
1880
  REQ_BGL = False
1881

    
1882
  def ExpandNames(self):
1883
    self._ExpandAndLockInstance()
1884
    self.needed_locks[locking.LEVEL_NODE] = []
1885
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1886

    
1887
  def DeclareLocks(self, level):
1888
    if level == locking.LEVEL_NODE:
1889
      self._LockInstancesNodes()
1890

    
1891
  def CheckPrereq(self):
1892
    """Check prerequisites.
1893

1894
    This checks that the instance is in the cluster.
1895

1896
    """
1897
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1898
    assert self.instance is not None, \
1899
      "Cannot retrieve locked instance %s" % self.op.instance_name
1900

    
1901
  def Exec(self, feedback_fn):
1902
    """Activate the disks.
1903

1904
    """
1905
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1906
    if not disks_ok:
1907
      raise errors.OpExecError("Cannot activate block devices")
1908

    
1909
    return disks_info
1910

    
1911

    
1912
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1913
  """Prepare the block devices for an instance.
1914

1915
  This sets up the block devices on all nodes.
1916

1917
  Args:
1918
    instance: a ganeti.objects.Instance object
1919
    ignore_secondaries: if true, errors on secondary nodes won't result
1920
                        in an error return from the function
1921

1922
  Returns:
1923
    false if the operation failed
1924
    list of (host, instance_visible_name, node_visible_name) if the operation
1925
         suceeded with the mapping from node devices to instance devices
1926
  """
1927
  device_info = []
1928
  disks_ok = True
1929
  iname = instance.name
1930
  # With the two passes mechanism we try to reduce the window of
1931
  # opportunity for the race condition of switching DRBD to primary
1932
  # before handshaking occured, but we do not eliminate it
1933

    
1934
  # The proper fix would be to wait (with some limits) until the
1935
  # connection has been made and drbd transitions from WFConnection
1936
  # into any other network-connected state (Connected, SyncTarget,
1937
  # SyncSource, etc.)
1938

    
1939
  # 1st pass, assemble on all nodes in secondary mode
1940
  for inst_disk in instance.disks:
1941
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1942
      cfg.SetDiskID(node_disk, node)
1943
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1944
      if not result:
1945
        logger.Error("could not prepare block device %s on node %s"
1946
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1947
        if not ignore_secondaries:
1948
          disks_ok = False
1949

    
1950
  # FIXME: race condition on drbd migration to primary
1951

    
1952
  # 2nd pass, do only the primary node
1953
  for inst_disk in instance.disks:
1954
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1955
      if node != instance.primary_node:
1956
        continue
1957
      cfg.SetDiskID(node_disk, node)
1958
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1959
      if not result:
1960
        logger.Error("could not prepare block device %s on node %s"
1961
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1962
        disks_ok = False
1963
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1964

    
1965
  # leave the disks configured for the primary node
1966
  # this is a workaround that would be fixed better by
1967
  # improving the logical/physical id handling
1968
  for disk in instance.disks:
1969
    cfg.SetDiskID(disk, instance.primary_node)
1970

    
1971
  return disks_ok, device_info
1972

    
1973

    
1974
def _StartInstanceDisks(cfg, instance, force):
1975
  """Start the disks of an instance.
1976

1977
  """
1978
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1979
                                           ignore_secondaries=force)
1980
  if not disks_ok:
1981
    _ShutdownInstanceDisks(instance, cfg)
1982
    if force is not None and not force:
1983
      logger.Error("If the message above refers to a secondary node,"
1984
                   " you can retry the operation using '--force'.")
1985
    raise errors.OpExecError("Disk consistency error")
1986

    
1987

    
1988
class LUDeactivateInstanceDisks(NoHooksLU):
1989
  """Shutdown an instance's disks.
1990

1991
  """
1992
  _OP_REQP = ["instance_name"]
1993
  REQ_BGL = False
1994

    
1995
  def ExpandNames(self):
1996
    self._ExpandAndLockInstance()
1997
    self.needed_locks[locking.LEVEL_NODE] = []
1998
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1999

    
2000
  def DeclareLocks(self, level):
2001
    if level == locking.LEVEL_NODE:
2002
      self._LockInstancesNodes()
2003

    
2004
  def CheckPrereq(self):
2005
    """Check prerequisites.
2006

2007
    This checks that the instance is in the cluster.
2008

2009
    """
2010
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2011
    assert self.instance is not None, \
2012
      "Cannot retrieve locked instance %s" % self.op.instance_name
2013

    
2014
  def Exec(self, feedback_fn):
2015
    """Deactivate the disks
2016

2017
    """
2018
    instance = self.instance
2019
    _SafeShutdownInstanceDisks(instance, self.cfg)
2020

    
2021

    
2022
def _SafeShutdownInstanceDisks(instance, cfg):
2023
  """Shutdown block devices of an instance.
2024

2025
  This function checks if an instance is running, before calling
2026
  _ShutdownInstanceDisks.
2027

2028
  """
2029
  ins_l = rpc.call_instance_list([instance.primary_node])
2030
  ins_l = ins_l[instance.primary_node]
2031
  if not type(ins_l) is list:
2032
    raise errors.OpExecError("Can't contact node '%s'" %
2033
                             instance.primary_node)
2034

    
2035
  if instance.name in ins_l:
2036
    raise errors.OpExecError("Instance is running, can't shutdown"
2037
                             " block devices.")
2038

    
2039
  _ShutdownInstanceDisks(instance, cfg)
2040

    
2041

    
2042
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
2043
  """Shutdown block devices of an instance.
2044

2045
  This does the shutdown on all nodes of the instance.
2046

2047
  If the ignore_primary is false, errors on the primary node are
2048
  ignored.
2049

2050
  """
2051
  result = True
2052
  for disk in instance.disks:
2053
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2054
      cfg.SetDiskID(top_disk, node)
2055
      if not rpc.call_blockdev_shutdown(node, top_disk):
2056
        logger.Error("could not shutdown block device %s on node %s" %
2057
                     (disk.iv_name, node))
2058
        if not ignore_primary or node != instance.primary_node:
2059
          result = False
2060
  return result
2061

    
2062

    
2063
def _CheckNodeFreeMemory(cfg, node, reason, requested):
2064
  """Checks if a node has enough free memory.
2065

2066
  This function check if a given node has the needed amount of free
2067
  memory. In case the node has less memory or we cannot get the
2068
  information from the node, this function raise an OpPrereqError
2069
  exception.
2070

2071
  Args:
2072
    - cfg: a ConfigWriter instance
2073
    - node: the node name
2074
    - reason: string to use in the error message
2075
    - requested: the amount of memory in MiB
2076

2077
  """
2078
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2079
  if not nodeinfo or not isinstance(nodeinfo, dict):
2080
    raise errors.OpPrereqError("Could not contact node %s for resource"
2081
                             " information" % (node,))
2082

    
2083
  free_mem = nodeinfo[node].get('memory_free')
2084
  if not isinstance(free_mem, int):
2085
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2086
                             " was '%s'" % (node, free_mem))
2087
  if requested > free_mem:
2088
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2089
                             " needed %s MiB, available %s MiB" %
2090
                             (node, reason, requested, free_mem))
2091

    
2092

    
2093
class LUStartupInstance(LogicalUnit):
2094
  """Starts an instance.
2095

2096
  """
2097
  HPATH = "instance-start"
2098
  HTYPE = constants.HTYPE_INSTANCE
2099
  _OP_REQP = ["instance_name", "force"]
2100
  REQ_BGL = False
2101

    
2102
  def ExpandNames(self):
2103
    self._ExpandAndLockInstance()
2104
    self.needed_locks[locking.LEVEL_NODE] = []
2105
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2106

    
2107
  def DeclareLocks(self, level):
2108
    if level == locking.LEVEL_NODE:
2109
      self._LockInstancesNodes()
2110

    
2111
  def BuildHooksEnv(self):
2112
    """Build hooks env.
2113

2114
    This runs on master, primary and secondary nodes of the instance.
2115

2116
    """
2117
    env = {
2118
      "FORCE": self.op.force,
2119
      }
2120
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2121
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2122
          list(self.instance.secondary_nodes))
2123
    return env, nl, nl
2124

    
2125
  def CheckPrereq(self):
2126
    """Check prerequisites.
2127

2128
    This checks that the instance is in the cluster.
2129

2130
    """
2131
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2132
    assert self.instance is not None, \
2133
      "Cannot retrieve locked instance %s" % self.op.instance_name
2134

    
2135
    # check bridges existance
2136
    _CheckInstanceBridgesExist(instance)
2137

    
2138
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2139
                         "starting instance %s" % instance.name,
2140
                         instance.memory)
2141

    
2142
  def Exec(self, feedback_fn):
2143
    """Start the instance.
2144

2145
    """
2146
    instance = self.instance
2147
    force = self.op.force
2148
    extra_args = getattr(self.op, "extra_args", "")
2149

    
2150
    self.cfg.MarkInstanceUp(instance.name)
2151

    
2152
    node_current = instance.primary_node
2153

    
2154
    _StartInstanceDisks(self.cfg, instance, force)
2155

    
2156
    if not rpc.call_instance_start(node_current, instance, extra_args):
2157
      _ShutdownInstanceDisks(instance, self.cfg)
2158
      raise errors.OpExecError("Could not start instance")
2159

    
2160

    
2161
class LURebootInstance(LogicalUnit):
2162
  """Reboot an instance.
2163

2164
  """
2165
  HPATH = "instance-reboot"
2166
  HTYPE = constants.HTYPE_INSTANCE
2167
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2168
  REQ_BGL = False
2169

    
2170
  def ExpandNames(self):
2171
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2172
                                   constants.INSTANCE_REBOOT_HARD,
2173
                                   constants.INSTANCE_REBOOT_FULL]:
2174
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2175
                                  (constants.INSTANCE_REBOOT_SOFT,
2176
                                   constants.INSTANCE_REBOOT_HARD,
2177
                                   constants.INSTANCE_REBOOT_FULL))
2178
    self._ExpandAndLockInstance()
2179
    self.needed_locks[locking.LEVEL_NODE] = []
2180
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2181

    
2182
  def DeclareLocks(self, level):
2183
    if level == locking.LEVEL_NODE:
2184
      primary_only = not constants.INSTANCE_REBOOT_FULL
2185
      self._LockInstancesNodes(primary_only=primary_only)
2186

    
2187
  def BuildHooksEnv(self):
2188
    """Build hooks env.
2189

2190
    This runs on master, primary and secondary nodes of the instance.
2191

2192
    """
2193
    env = {
2194
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2195
      }
2196
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2197
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2198
          list(self.instance.secondary_nodes))
2199
    return env, nl, nl
2200

    
2201
  def CheckPrereq(self):
2202
    """Check prerequisites.
2203

2204
    This checks that the instance is in the cluster.
2205

2206
    """
2207
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2208
    assert self.instance is not None, \
2209
      "Cannot retrieve locked instance %s" % self.op.instance_name
2210

    
2211
    # check bridges existance
2212
    _CheckInstanceBridgesExist(instance)
2213

    
2214
  def Exec(self, feedback_fn):
2215
    """Reboot the instance.
2216

2217
    """
2218
    instance = self.instance
2219
    ignore_secondaries = self.op.ignore_secondaries
2220
    reboot_type = self.op.reboot_type
2221
    extra_args = getattr(self.op, "extra_args", "")
2222

    
2223
    node_current = instance.primary_node
2224

    
2225
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2226
                       constants.INSTANCE_REBOOT_HARD]:
2227
      if not rpc.call_instance_reboot(node_current, instance,
2228
                                      reboot_type, extra_args):
2229
        raise errors.OpExecError("Could not reboot instance")
2230
    else:
2231
      if not rpc.call_instance_shutdown(node_current, instance):
2232
        raise errors.OpExecError("could not shutdown instance for full reboot")
2233
      _ShutdownInstanceDisks(instance, self.cfg)
2234
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2235
      if not rpc.call_instance_start(node_current, instance, extra_args):
2236
        _ShutdownInstanceDisks(instance, self.cfg)
2237
        raise errors.OpExecError("Could not start instance for full reboot")
2238

    
2239
    self.cfg.MarkInstanceUp(instance.name)
2240

    
2241

    
2242
class LUShutdownInstance(LogicalUnit):
2243
  """Shutdown an instance.
2244

2245
  """
2246
  HPATH = "instance-stop"
2247
  HTYPE = constants.HTYPE_INSTANCE
2248
  _OP_REQP = ["instance_name"]
2249
  REQ_BGL = False
2250

    
2251
  def ExpandNames(self):
2252
    self._ExpandAndLockInstance()
2253
    self.needed_locks[locking.LEVEL_NODE] = []
2254
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2255

    
2256
  def DeclareLocks(self, level):
2257
    if level == locking.LEVEL_NODE:
2258
      self._LockInstancesNodes()
2259

    
2260
  def BuildHooksEnv(self):
2261
    """Build hooks env.
2262

2263
    This runs on master, primary and secondary nodes of the instance.
2264

2265
    """
2266
    env = _BuildInstanceHookEnvByObject(self.instance)
2267
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2268
          list(self.instance.secondary_nodes))
2269
    return env, nl, nl
2270

    
2271
  def CheckPrereq(self):
2272
    """Check prerequisites.
2273

2274
    This checks that the instance is in the cluster.
2275

2276
    """
2277
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2278
    assert self.instance is not None, \
2279
      "Cannot retrieve locked instance %s" % self.op.instance_name
2280

    
2281
  def Exec(self, feedback_fn):
2282
    """Shutdown the instance.
2283

2284
    """
2285
    instance = self.instance
2286
    node_current = instance.primary_node
2287
    self.cfg.MarkInstanceDown(instance.name)
2288
    if not rpc.call_instance_shutdown(node_current, instance):
2289
      logger.Error("could not shutdown instance")
2290

    
2291
    _ShutdownInstanceDisks(instance, self.cfg)
2292

    
2293

    
2294
class LUReinstallInstance(LogicalUnit):
2295
  """Reinstall an instance.
2296

2297
  """
2298
  HPATH = "instance-reinstall"
2299
  HTYPE = constants.HTYPE_INSTANCE
2300
  _OP_REQP = ["instance_name"]
2301
  REQ_BGL = False
2302

    
2303
  def ExpandNames(self):
2304
    self._ExpandAndLockInstance()
2305
    self.needed_locks[locking.LEVEL_NODE] = []
2306
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2307

    
2308
  def DeclareLocks(self, level):
2309
    if level == locking.LEVEL_NODE:
2310
      self._LockInstancesNodes()
2311

    
2312
  def BuildHooksEnv(self):
2313
    """Build hooks env.
2314

2315
    This runs on master, primary and secondary nodes of the instance.
2316

2317
    """
2318
    env = _BuildInstanceHookEnvByObject(self.instance)
2319
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2320
          list(self.instance.secondary_nodes))
2321
    return env, nl, nl
2322

    
2323
  def CheckPrereq(self):
2324
    """Check prerequisites.
2325

2326
    This checks that the instance is in the cluster and is not running.
2327

2328
    """
2329
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2330
    assert instance is not None, \
2331
      "Cannot retrieve locked instance %s" % self.op.instance_name
2332

    
2333
    if instance.disk_template == constants.DT_DISKLESS:
2334
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2335
                                 self.op.instance_name)
2336
    if instance.status != "down":
2337
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2338
                                 self.op.instance_name)
2339
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2340
    if remote_info:
2341
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2342
                                 (self.op.instance_name,
2343
                                  instance.primary_node))
2344

    
2345
    self.op.os_type = getattr(self.op, "os_type", None)
2346
    if self.op.os_type is not None:
2347
      # OS verification
2348
      pnode = self.cfg.GetNodeInfo(
2349
        self.cfg.ExpandNodeName(instance.primary_node))
2350
      if pnode is None:
2351
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2352
                                   self.op.pnode)
2353
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2354
      if not os_obj:
2355
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2356
                                   " primary node"  % self.op.os_type)
2357

    
2358
    self.instance = instance
2359

    
2360
  def Exec(self, feedback_fn):
2361
    """Reinstall the instance.
2362

2363
    """
2364
    inst = self.instance
2365

    
2366
    if self.op.os_type is not None:
2367
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2368
      inst.os = self.op.os_type
2369
      self.cfg.AddInstance(inst)
2370

    
2371
    _StartInstanceDisks(self.cfg, inst, None)
2372
    try:
2373
      feedback_fn("Running the instance OS create scripts...")
2374
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2375
        raise errors.OpExecError("Could not install OS for instance %s"
2376
                                 " on node %s" %
2377
                                 (inst.name, inst.primary_node))
2378
    finally:
2379
      _ShutdownInstanceDisks(inst, self.cfg)
2380

    
2381

    
2382
class LURenameInstance(LogicalUnit):
2383
  """Rename an instance.
2384

2385
  """
2386
  HPATH = "instance-rename"
2387
  HTYPE = constants.HTYPE_INSTANCE
2388
  _OP_REQP = ["instance_name", "new_name"]
2389

    
2390
  def BuildHooksEnv(self):
2391
    """Build hooks env.
2392

2393
    This runs on master, primary and secondary nodes of the instance.
2394

2395
    """
2396
    env = _BuildInstanceHookEnvByObject(self.instance)
2397
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2398
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2399
          list(self.instance.secondary_nodes))
2400
    return env, nl, nl
2401

    
2402
  def CheckPrereq(self):
2403
    """Check prerequisites.
2404

2405
    This checks that the instance is in the cluster and is not running.
2406

2407
    """
2408
    instance = self.cfg.GetInstanceInfo(
2409
      self.cfg.ExpandInstanceName(self.op.instance_name))
2410
    if instance is None:
2411
      raise errors.OpPrereqError("Instance '%s' not known" %
2412
                                 self.op.instance_name)
2413
    if instance.status != "down":
2414
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2415
                                 self.op.instance_name)
2416
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2417
    if remote_info:
2418
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2419
                                 (self.op.instance_name,
2420
                                  instance.primary_node))
2421
    self.instance = instance
2422

    
2423
    # new name verification
2424
    name_info = utils.HostInfo(self.op.new_name)
2425

    
2426
    self.op.new_name = new_name = name_info.name
2427
    instance_list = self.cfg.GetInstanceList()
2428
    if new_name in instance_list:
2429
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2430
                                 new_name)
2431

    
2432
    if not getattr(self.op, "ignore_ip", False):
2433
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2434
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2435
                                   (name_info.ip, new_name))
2436

    
2437

    
2438
  def Exec(self, feedback_fn):
2439
    """Reinstall the instance.
2440

2441
    """
2442
    inst = self.instance
2443
    old_name = inst.name
2444

    
2445
    if inst.disk_template == constants.DT_FILE:
2446
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2447

    
2448
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2449
    # Change the instance lock. This is definitely safe while we hold the BGL
2450
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2451
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2452

    
2453
    # re-read the instance from the configuration after rename
2454
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2455

    
2456
    if inst.disk_template == constants.DT_FILE:
2457
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2458
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2459
                                                old_file_storage_dir,
2460
                                                new_file_storage_dir)
2461

    
2462
      if not result:
2463
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2464
                                 " directory '%s' to '%s' (but the instance"
2465
                                 " has been renamed in Ganeti)" % (
2466
                                 inst.primary_node, old_file_storage_dir,
2467
                                 new_file_storage_dir))
2468

    
2469
      if not result[0]:
2470
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2471
                                 " (but the instance has been renamed in"
2472
                                 " Ganeti)" % (old_file_storage_dir,
2473
                                               new_file_storage_dir))
2474

    
2475
    _StartInstanceDisks(self.cfg, inst, None)
2476
    try:
2477
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2478
                                          "sda", "sdb"):
2479
        msg = ("Could not run OS rename script for instance %s on node %s"
2480
               " (but the instance has been renamed in Ganeti)" %
2481
               (inst.name, inst.primary_node))
2482
        logger.Error(msg)
2483
    finally:
2484
      _ShutdownInstanceDisks(inst, self.cfg)
2485

    
2486

    
2487
class LURemoveInstance(LogicalUnit):
2488
  """Remove an instance.
2489

2490
  """
2491
  HPATH = "instance-remove"
2492
  HTYPE = constants.HTYPE_INSTANCE
2493
  _OP_REQP = ["instance_name", "ignore_failures"]
2494
  REQ_BGL = False
2495

    
2496
  def ExpandNames(self):
2497
    self._ExpandAndLockInstance()
2498
    self.needed_locks[locking.LEVEL_NODE] = []
2499
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2500

    
2501
  def DeclareLocks(self, level):
2502
    if level == locking.LEVEL_NODE:
2503
      self._LockInstancesNodes()
2504

    
2505
  def BuildHooksEnv(self):
2506
    """Build hooks env.
2507

2508
    This runs on master, primary and secondary nodes of the instance.
2509

2510
    """
2511
    env = _BuildInstanceHookEnvByObject(self.instance)
2512
    nl = [self.sstore.GetMasterNode()]
2513
    return env, nl, nl
2514

    
2515
  def CheckPrereq(self):
2516
    """Check prerequisites.
2517

2518
    This checks that the instance is in the cluster.
2519

2520
    """
2521
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2522
    assert self.instance is not None, \
2523
      "Cannot retrieve locked instance %s" % self.op.instance_name
2524

    
2525
  def Exec(self, feedback_fn):
2526
    """Remove the instance.
2527

2528
    """
2529
    instance = self.instance
2530
    logger.Info("shutting down instance %s on node %s" %
2531
                (instance.name, instance.primary_node))
2532

    
2533
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2534
      if self.op.ignore_failures:
2535
        feedback_fn("Warning: can't shutdown instance")
2536
      else:
2537
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2538
                                 (instance.name, instance.primary_node))
2539

    
2540
    logger.Info("removing block devices for instance %s" % instance.name)
2541

    
2542
    if not _RemoveDisks(instance, self.cfg):
2543
      if self.op.ignore_failures:
2544
        feedback_fn("Warning: can't remove instance's disks")
2545
      else:
2546
        raise errors.OpExecError("Can't remove instance's disks")
2547

    
2548
    logger.Info("removing instance %s out of cluster config" % instance.name)
2549

    
2550
    self.cfg.RemoveInstance(instance.name)
2551
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2552

    
2553

    
2554
class LUQueryInstances(NoHooksLU):
2555
  """Logical unit for querying instances.
2556

2557
  """
2558
  _OP_REQP = ["output_fields", "names"]
2559
  REQ_BGL = False
2560

    
2561
  def ExpandNames(self):
2562
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2563
    self.static_fields = frozenset([
2564
      "name", "os", "pnode", "snodes",
2565
      "admin_state", "admin_ram",
2566
      "disk_template", "ip", "mac", "bridge",
2567
      "sda_size", "sdb_size", "vcpus", "tags",
2568
      "auto_balance",
2569
      "network_port", "kernel_path", "initrd_path",
2570
      "hvm_boot_order", "hvm_acpi", "hvm_pae",
2571
      "hvm_cdrom_image_path", "hvm_nic_type",
2572
      "hvm_disk_type", "vnc_bind_address",
2573
      ])
2574
    _CheckOutputFields(static=self.static_fields,
2575
                       dynamic=self.dynamic_fields,
2576
                       selected=self.op.output_fields)
2577

    
2578
    self.needed_locks = {}
2579
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2580
    self.share_locks[locking.LEVEL_NODE] = 1
2581

    
2582
    if self.op.names:
2583
      self.wanted = _GetWantedInstances(self, self.op.names)
2584
    else:
2585
      self.wanted = locking.ALL_SET
2586

    
2587
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2588
    if self.do_locking:
2589
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2590
      self.needed_locks[locking.LEVEL_NODE] = []
2591
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2592

    
2593
  def DeclareLocks(self, level):
2594
    if level == locking.LEVEL_NODE and self.do_locking:
2595
      self._LockInstancesNodes()
2596

    
2597
  def CheckPrereq(self):
2598
    """Check prerequisites.
2599

2600
    """
2601
    pass
2602

    
2603
  def Exec(self, feedback_fn):
2604
    """Computes the list of nodes and their attributes.
2605

2606
    """
2607
    all_info = self.cfg.GetAllInstancesInfo()
2608
    if self.do_locking:
2609
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2610
    else:
2611
      instance_names = all_info.keys()
2612
    instance_list = [all_info[iname] for iname in instance_names]
2613

    
2614
    # begin data gathering
2615

    
2616
    nodes = frozenset([inst.primary_node for inst in instance_list])
2617

    
2618
    bad_nodes = []
2619
    if self.dynamic_fields.intersection(self.op.output_fields):
2620
      live_data = {}
2621
      node_data = rpc.call_all_instances_info(nodes)
2622
      for name in nodes:
2623
        result = node_data[name]
2624
        if result:
2625
          live_data.update(result)
2626
        elif result == False:
2627
          bad_nodes.append(name)
2628
        # else no instance is alive
2629
    else:
2630
      live_data = dict([(name, {}) for name in instance_names])
2631

    
2632
    # end data gathering
2633

    
2634
    output = []
2635
    for instance in instance_list:
2636
      iout = []
2637
      for field in self.op.output_fields:
2638
        if field == "name":
2639
          val = instance.name
2640
        elif field == "os":
2641
          val = instance.os
2642
        elif field == "pnode":
2643
          val = instance.primary_node
2644
        elif field == "snodes":
2645
          val = list(instance.secondary_nodes)
2646
        elif field == "admin_state":
2647
          val = (instance.status != "down")
2648
        elif field == "oper_state":
2649
          if instance.primary_node in bad_nodes:
2650
            val = None
2651
          else:
2652
            val = bool(live_data.get(instance.name))
2653
        elif field == "status":
2654
          if instance.primary_node in bad_nodes:
2655
            val = "ERROR_nodedown"
2656
          else:
2657
            running = bool(live_data.get(instance.name))
2658
            if running:
2659
              if instance.status != "down":
2660
                val = "running"
2661
              else:
2662
                val = "ERROR_up"
2663
            else:
2664
              if instance.status != "down":
2665
                val = "ERROR_down"
2666
              else:
2667
                val = "ADMIN_down"
2668
        elif field == "admin_ram":
2669
          val = instance.memory
2670
        elif field == "oper_ram":
2671
          if instance.primary_node in bad_nodes:
2672
            val = None
2673
          elif instance.name in live_data:
2674
            val = live_data[instance.name].get("memory", "?")
2675
          else:
2676
            val = "-"
2677
        elif field == "disk_template":
2678
          val = instance.disk_template
2679
        elif field == "ip":
2680
          val = instance.nics[0].ip
2681
        elif field == "bridge":
2682
          val = instance.nics[0].bridge
2683
        elif field == "mac":
2684
          val = instance.nics[0].mac
2685
        elif field == "sda_size" or field == "sdb_size":
2686
          disk = instance.FindDisk(field[:3])
2687
          if disk is None:
2688
            val = None
2689
          else:
2690
            val = disk.size
2691
        elif field == "vcpus":
2692
          val = instance.vcpus
2693
        elif field == "tags":
2694
          val = list(instance.GetTags())
2695
        elif field in ("network_port", "kernel_path", "initrd_path",
2696
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2697
                       "hvm_cdrom_image_path", "hvm_nic_type",
2698
                       "hvm_disk_type", "vnc_bind_address"):
2699
          val = getattr(instance, field, None)
2700
          if val is not None:
2701
            pass
2702
          elif field in ("hvm_nic_type", "hvm_disk_type",
2703
                         "kernel_path", "initrd_path"):
2704
            val = "default"
2705
          else:
2706
            val = "-"
2707
        else:
2708
          raise errors.ParameterError(field)
2709
        iout.append(val)
2710
      output.append(iout)
2711

    
2712
    return output
2713

    
2714

    
2715
class LUFailoverInstance(LogicalUnit):
2716
  """Failover an instance.
2717

2718
  """
2719
  HPATH = "instance-failover"
2720
  HTYPE = constants.HTYPE_INSTANCE
2721
  _OP_REQP = ["instance_name", "ignore_consistency"]
2722
  REQ_BGL = False
2723

    
2724
  def ExpandNames(self):
2725
    self._ExpandAndLockInstance()
2726
    self.needed_locks[locking.LEVEL_NODE] = []
2727
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2728

    
2729
  def DeclareLocks(self, level):
2730
    if level == locking.LEVEL_NODE:
2731
      self._LockInstancesNodes()
2732

    
2733
  def BuildHooksEnv(self):
2734
    """Build hooks env.
2735

2736
    This runs on master, primary and secondary nodes of the instance.
2737

2738
    """
2739
    env = {
2740
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2741
      }
2742
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2743
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2744
    return env, nl, nl
2745

    
2746
  def CheckPrereq(self):
2747
    """Check prerequisites.
2748

2749
    This checks that the instance is in the cluster.
2750

2751
    """
2752
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2753
    assert self.instance is not None, \
2754
      "Cannot retrieve locked instance %s" % self.op.instance_name
2755

    
2756
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2757
      raise errors.OpPrereqError("Instance's disk layout is not"
2758
                                 " network mirrored, cannot failover.")
2759

    
2760
    secondary_nodes = instance.secondary_nodes
2761
    if not secondary_nodes:
2762
      raise errors.ProgrammerError("no secondary node but using "
2763
                                   "a mirrored disk template")
2764

    
2765
    target_node = secondary_nodes[0]
2766
    # check memory requirements on the secondary node
2767
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2768
                         instance.name, instance.memory)
2769

    
2770
    # check bridge existance
2771
    brlist = [nic.bridge for nic in instance.nics]
2772
    if not rpc.call_bridges_exist(target_node, brlist):
2773
      raise errors.OpPrereqError("One or more target bridges %s does not"
2774
                                 " exist on destination node '%s'" %
2775
                                 (brlist, target_node))
2776

    
2777
  def Exec(self, feedback_fn):
2778
    """Failover an instance.
2779

2780
    The failover is done by shutting it down on its present node and
2781
    starting it on the secondary.
2782

2783
    """
2784
    instance = self.instance
2785

    
2786
    source_node = instance.primary_node
2787
    target_node = instance.secondary_nodes[0]
2788

    
2789
    feedback_fn("* checking disk consistency between source and target")
2790
    for dev in instance.disks:
2791
      # for drbd, these are drbd over lvm
2792
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2793
        if instance.status == "up" and not self.op.ignore_consistency:
2794
          raise errors.OpExecError("Disk %s is degraded on target node,"
2795
                                   " aborting failover." % dev.iv_name)
2796

    
2797
    feedback_fn("* shutting down instance on source node")
2798
    logger.Info("Shutting down instance %s on node %s" %
2799
                (instance.name, source_node))
2800

    
2801
    if not rpc.call_instance_shutdown(source_node, instance):
2802
      if self.op.ignore_consistency:
2803
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2804
                     " anyway. Please make sure node %s is down"  %
2805
                     (instance.name, source_node, source_node))
2806
      else:
2807
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2808
                                 (instance.name, source_node))
2809

    
2810
    feedback_fn("* deactivating the instance's disks on source node")
2811
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2812
      raise errors.OpExecError("Can't shut down the instance's disks.")
2813

    
2814
    instance.primary_node = target_node
2815
    # distribute new instance config to the other nodes
2816
    self.cfg.Update(instance)
2817

    
2818
    # Only start the instance if it's marked as up
2819
    if instance.status == "up":
2820
      feedback_fn("* activating the instance's disks on target node")
2821
      logger.Info("Starting instance %s on node %s" %
2822
                  (instance.name, target_node))
2823

    
2824
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2825
                                               ignore_secondaries=True)
2826
      if not disks_ok:
2827
        _ShutdownInstanceDisks(instance, self.cfg)
2828
        raise errors.OpExecError("Can't activate the instance's disks")
2829

    
2830
      feedback_fn("* starting the instance on the target node")
2831
      if not rpc.call_instance_start(target_node, instance, None):
2832
        _ShutdownInstanceDisks(instance, self.cfg)
2833
        raise errors.OpExecError("Could not start instance %s on node %s." %
2834
                                 (instance.name, target_node))
2835

    
2836

    
2837
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2838
  """Create a tree of block devices on the primary node.
2839

2840
  This always creates all devices.
2841

2842
  """
2843
  if device.children:
2844
    for child in device.children:
2845
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2846
        return False
2847

    
2848
  cfg.SetDiskID(device, node)
2849
  new_id = rpc.call_blockdev_create(node, device, device.size,
2850
                                    instance.name, True, info)
2851
  if not new_id:
2852
    return False
2853
  if device.physical_id is None:
2854
    device.physical_id = new_id
2855
  return True
2856

    
2857

    
2858
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2859
  """Create a tree of block devices on a secondary node.
2860

2861
  If this device type has to be created on secondaries, create it and
2862
  all its children.
2863

2864
  If not, just recurse to children keeping the same 'force' value.
2865

2866
  """
2867
  if device.CreateOnSecondary():
2868
    force = True
2869
  if device.children:
2870
    for child in device.children:
2871
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2872
                                        child, force, info):
2873
        return False
2874

    
2875
  if not force:
2876
    return True
2877
  cfg.SetDiskID(device, node)
2878
  new_id = rpc.call_blockdev_create(node, device, device.size,
2879
                                    instance.name, False, info)
2880
  if not new_id:
2881
    return False
2882
  if device.physical_id is None:
2883
    device.physical_id = new_id
2884
  return True
2885

    
2886

    
2887
def _GenerateUniqueNames(cfg, exts):
2888
  """Generate a suitable LV name.
2889

2890
  This will generate a logical volume name for the given instance.
2891

2892
  """
2893
  results = []
2894
  for val in exts:
2895
    new_id = cfg.GenerateUniqueID()
2896
    results.append("%s%s" % (new_id, val))
2897
  return results
2898

    
2899

    
2900
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2901
  """Generate a drbd8 device complete with its children.
2902

2903
  """
2904
  port = cfg.AllocatePort()
2905
  vgname = cfg.GetVGName()
2906
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2907
                          logical_id=(vgname, names[0]))
2908
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2909
                          logical_id=(vgname, names[1]))
2910
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2911
                          logical_id = (primary, secondary, port),
2912
                          children = [dev_data, dev_meta],
2913
                          iv_name=iv_name)
2914
  return drbd_dev
2915

    
2916

    
2917
def _GenerateDiskTemplate(cfg, template_name,
2918
                          instance_name, primary_node,
2919
                          secondary_nodes, disk_sz, swap_sz,
2920
                          file_storage_dir, file_driver):
2921
  """Generate the entire disk layout for a given template type.
2922

2923
  """
2924
  #TODO: compute space requirements
2925

    
2926
  vgname = cfg.GetVGName()
2927
  if template_name == constants.DT_DISKLESS:
2928
    disks = []
2929
  elif template_name == constants.DT_PLAIN:
2930
    if len(secondary_nodes) != 0:
2931
      raise errors.ProgrammerError("Wrong template configuration")
2932

    
2933
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2934
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2935
                           logical_id=(vgname, names[0]),
2936
                           iv_name = "sda")
2937
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2938
                           logical_id=(vgname, names[1]),
2939
                           iv_name = "sdb")
2940
    disks = [sda_dev, sdb_dev]
2941
  elif template_name == constants.DT_DRBD8:
2942
    if len(secondary_nodes) != 1:
2943
      raise errors.ProgrammerError("Wrong template configuration")
2944
    remote_node = secondary_nodes[0]
2945
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2946
                                       ".sdb_data", ".sdb_meta"])
2947
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2948
                                         disk_sz, names[0:2], "sda")
2949
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2950
                                         swap_sz, names[2:4], "sdb")
2951
    disks = [drbd_sda_dev, drbd_sdb_dev]
2952
  elif template_name == constants.DT_FILE:
2953
    if len(secondary_nodes) != 0:
2954
      raise errors.ProgrammerError("Wrong template configuration")
2955

    
2956
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2957
                                iv_name="sda", logical_id=(file_driver,
2958
                                "%s/sda" % file_storage_dir))
2959
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2960
                                iv_name="sdb", logical_id=(file_driver,
2961
                                "%s/sdb" % file_storage_dir))
2962
    disks = [file_sda_dev, file_sdb_dev]
2963
  else:
2964
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2965
  return disks
2966

    
2967

    
2968
def _GetInstanceInfoText(instance):
2969
  """Compute that text that should be added to the disk's metadata.
2970

2971
  """
2972
  return "originstname+%s" % instance.name
2973

    
2974

    
2975
def _CreateDisks(cfg, instance):
2976
  """Create all disks for an instance.
2977

2978
  This abstracts away some work from AddInstance.
2979

2980
  Args:
2981
    instance: the instance object
2982

2983
  Returns:
2984
    True or False showing the success of the creation process
2985

2986
  """
2987
  info = _GetInstanceInfoText(instance)
2988

    
2989
  if instance.disk_template == constants.DT_FILE:
2990
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2991
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2992
                                              file_storage_dir)
2993

    
2994
    if not result:
2995
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2996
      return False
2997

    
2998
    if not result[0]:
2999
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3000
      return False
3001

    
3002
  for device in instance.disks:
3003
    logger.Info("creating volume %s for instance %s" %
3004
                (device.iv_name, instance.name))
3005
    #HARDCODE
3006
    for secondary_node in instance.secondary_nodes:
3007
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
3008
                                        device, False, info):
3009
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3010
                     (device.iv_name, device, secondary_node))
3011
        return False
3012
    #HARDCODE
3013
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3014
                                    instance, device, info):
3015
      logger.Error("failed to create volume %s on primary!" %
3016
                   device.iv_name)
3017
      return False
3018

    
3019
  return True
3020

    
3021

    
3022
def _RemoveDisks(instance, cfg):
3023
  """Remove all disks for an instance.
3024

3025
  This abstracts away some work from `AddInstance()` and
3026
  `RemoveInstance()`. Note that in case some of the devices couldn't
3027
  be removed, the removal will continue with the other ones (compare
3028
  with `_CreateDisks()`).
3029

3030
  Args:
3031
    instance: the instance object
3032

3033
  Returns:
3034
    True or False showing the success of the removal proces
3035

3036
  """
3037
  logger.Info("removing block devices for instance %s" % instance.name)
3038

    
3039
  result = True
3040
  for device in instance.disks:
3041
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3042
      cfg.SetDiskID(disk, node)
3043
      if not rpc.call_blockdev_remove(node, disk):
3044
        logger.Error("could not remove block device %s on node %s,"
3045
                     " continuing anyway" %
3046
                     (device.iv_name, node))
3047
        result = False
3048

    
3049
  if instance.disk_template == constants.DT_FILE:
3050
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3051
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
3052
                                            file_storage_dir):
3053
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3054
      result = False
3055

    
3056
  return result
3057

    
3058

    
3059
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3060
  """Compute disk size requirements in the volume group
3061

3062
  This is currently hard-coded for the two-drive layout.
3063

3064
  """
3065
  # Required free disk space as a function of disk and swap space
3066
  req_size_dict = {
3067
    constants.DT_DISKLESS: None,
3068
    constants.DT_PLAIN: disk_size + swap_size,
3069
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3070
    constants.DT_DRBD8: disk_size + swap_size + 256,
3071
    constants.DT_FILE: None,
3072
  }
3073

    
3074
  if disk_template not in req_size_dict:
3075
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3076
                                 " is unknown" %  disk_template)
3077

    
3078
  return req_size_dict[disk_template]
3079

    
3080

    
3081
class LUCreateInstance(LogicalUnit):
3082
  """Create an instance.
3083

3084
  """
3085
  HPATH = "instance-add"
3086
  HTYPE = constants.HTYPE_INSTANCE
3087
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3088
              "disk_template", "swap_size", "mode", "start", "vcpus",
3089
              "wait_for_sync", "ip_check", "mac"]
3090
  REQ_BGL = False
3091

    
3092
  def _ExpandNode(self, node):
3093
    """Expands and checks one node name.
3094

3095
    """
3096
    node_full = self.cfg.ExpandNodeName(node)
3097
    if node_full is None:
3098
      raise errors.OpPrereqError("Unknown node %s" % node)
3099
    return node_full
3100

    
3101
  def ExpandNames(self):
3102
    """ExpandNames for CreateInstance.
3103

3104
    Figure out the right locks for instance creation.
3105

3106
    """
3107
    self.needed_locks = {}
3108

    
3109
    # set optional parameters to none if they don't exist
3110
    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3111
                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3112
                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3113
                 "vnc_bind_address"]:
3114
      if not hasattr(self.op, attr):
3115
        setattr(self.op, attr, None)
3116

    
3117
    # verify creation mode
3118
    if self.op.mode not in (constants.INSTANCE_CREATE,
3119
                            constants.INSTANCE_IMPORT):
3120
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3121
                                 self.op.mode)
3122
    # disk template and mirror node verification
3123
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3124
      raise errors.OpPrereqError("Invalid disk template name")
3125

    
3126
    #### instance parameters check
3127

    
3128
    # instance name verification
3129
    hostname1 = utils.HostInfo(self.op.instance_name)
3130
    self.op.instance_name = instance_name = hostname1.name
3131

    
3132
    # this is just a preventive check, but someone might still add this
3133
    # instance in the meantime, and creation will fail at lock-add time
3134
    if instance_name in self.cfg.GetInstanceList():
3135
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3136
                                 instance_name)
3137

    
3138
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3139

    
3140
    # ip validity checks
3141
    ip = getattr(self.op, "ip", None)
3142
    if ip is None or ip.lower() == "none":
3143
      inst_ip = None
3144
    elif ip.lower() == "auto":
3145
      inst_ip = hostname1.ip
3146
    else:
3147
      if not utils.IsValidIP(ip):
3148
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3149
                                   " like a valid IP" % ip)
3150
      inst_ip = ip
3151
    self.inst_ip = self.op.ip = inst_ip
3152
    # used in CheckPrereq for ip ping check
3153
    self.check_ip = hostname1.ip
3154

    
3155
    # MAC address verification
3156
    if self.op.mac != "auto":
3157
      if not utils.IsValidMac(self.op.mac.lower()):
3158
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3159
                                   self.op.mac)
3160

    
3161
    # boot order verification
3162
    if self.op.hvm_boot_order is not None:
3163
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3164
        raise errors.OpPrereqError("invalid boot order specified,"
3165
                                   " must be one or more of [acdn]")
3166
    # file storage checks
3167
    if (self.op.file_driver and
3168
        not self.op.file_driver in constants.FILE_DRIVER):
3169
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3170
                                 self.op.file_driver)
3171

    
3172
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3173
      raise errors.OpPrereqError("File storage directory path not absolute")
3174

    
3175
    ### Node/iallocator related checks
3176
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3177
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3178
                                 " node must be given")
3179

    
3180
    if self.op.iallocator:
3181
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3182
    else:
3183
      self.op.pnode = self._ExpandNode(self.op.pnode)
3184
      nodelist = [self.op.pnode]
3185
      if self.op.snode is not None:
3186
        self.op.snode = self._ExpandNode(self.op.snode)
3187
        nodelist.append(self.op.snode)
3188
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3189

    
3190
    # in case of import lock the source node too
3191
    if self.op.mode == constants.INSTANCE_IMPORT:
3192
      src_node = getattr(self.op, "src_node", None)
3193
      src_path = getattr(self.op, "src_path", None)
3194

    
3195
      if src_node is None or src_path is None:
3196
        raise errors.OpPrereqError("Importing an instance requires source"
3197
                                   " node and path options")
3198

    
3199
      if not os.path.isabs(src_path):
3200
        raise errors.OpPrereqError("The source path must be absolute")
3201

    
3202
      self.op.src_node = src_node = self._ExpandNode(src_node)
3203
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3204
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3205

    
3206
    else: # INSTANCE_CREATE
3207
      if getattr(self.op, "os_type", None) is None:
3208
        raise errors.OpPrereqError("No guest OS specified")
3209

    
3210
  def _RunAllocator(self):
3211
    """Run the allocator based on input opcode.
3212

3213
    """
3214
    disks = [{"size": self.op.disk_size, "mode": "w"},
3215
             {"size": self.op.swap_size, "mode": "w"}]
3216
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3217
             "bridge": self.op.bridge}]
3218
    ial = IAllocator(self.cfg, self.sstore,
3219
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3220
                     name=self.op.instance_name,
3221
                     disk_template=self.op.disk_template,
3222
                     tags=[],
3223
                     os=self.op.os_type,
3224
                     vcpus=self.op.vcpus,
3225
                     mem_size=self.op.mem_size,
3226
                     disks=disks,
3227
                     nics=nics,
3228
                     )
3229

    
3230
    ial.Run(self.op.iallocator)
3231

    
3232
    if not ial.success:
3233
      raise errors.OpPrereqError("Can't compute nodes using"
3234
                                 " iallocator '%s': %s" % (self.op.iallocator,
3235
                                                           ial.info))
3236
    if len(ial.nodes) != ial.required_nodes:
3237
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3238
                                 " of nodes (%s), required %s" %
3239
                                 (len(ial.nodes), ial.required_nodes))
3240
    self.op.pnode = ial.nodes[0]
3241
    logger.ToStdout("Selected nodes for the instance: %s" %
3242
                    (", ".join(ial.nodes),))
3243
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3244
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3245
    if ial.required_nodes == 2:
3246
      self.op.snode = ial.nodes[1]
3247

    
3248
  def BuildHooksEnv(self):
3249
    """Build hooks env.
3250

3251
    This runs on master, primary and secondary nodes of the instance.
3252

3253
    """
3254
    env = {
3255
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3256
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3257
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3258
      "INSTANCE_ADD_MODE": self.op.mode,
3259
      }
3260
    if self.op.mode == constants.INSTANCE_IMPORT:
3261
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3262
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3263
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3264

    
3265
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3266
      primary_node=self.op.pnode,
3267
      secondary_nodes=self.secondaries,
3268
      status=self.instance_status,
3269
      os_type=self.op.os_type,
3270
      memory=self.op.mem_size,
3271
      vcpus=self.op.vcpus,
3272
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3273
    ))
3274

    
3275
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3276
          self.secondaries)
3277
    return env, nl, nl
3278

    
3279

    
3280
  def CheckPrereq(self):
3281
    """Check prerequisites.
3282

3283
    """
3284
    if (not self.cfg.GetVGName() and
3285
        self.op.disk_template not in constants.DTS_NOT_LVM):
3286
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3287
                                 " instances")
3288

    
3289
    if self.op.mode == constants.INSTANCE_IMPORT:
3290
      src_node = self.op.src_node
3291
      src_path = self.op.src_path
3292

    
3293
      export_info = rpc.call_export_info(src_node, src_path)
3294

    
3295
      if not export_info:
3296
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3297

    
3298
      if not export_info.has_section(constants.INISECT_EXP):
3299
        raise errors.ProgrammerError("Corrupted export config")
3300

    
3301
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3302
      if (int(ei_version) != constants.EXPORT_VERSION):
3303
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3304
                                   (ei_version, constants.EXPORT_VERSION))
3305

    
3306
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3307
        raise errors.OpPrereqError("Can't import instance with more than"
3308
                                   " one data disk")
3309

    
3310
      # FIXME: are the old os-es, disk sizes, etc. useful?
3311
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3312
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3313
                                                         'disk0_dump'))
3314
      self.src_image = diskimage
3315

    
3316
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3317

    
3318
    if self.op.start and not self.op.ip_check:
3319
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3320
                                 " adding an instance in start mode")
3321

    
3322
    if self.op.ip_check:
3323
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3324
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3325
                                   (self.check_ip, instance_name))
3326

    
3327
    # bridge verification
3328
    bridge = getattr(self.op, "bridge", None)
3329
    if bridge is None:
3330
      self.op.bridge = self.cfg.GetDefBridge()
3331
    else:
3332
      self.op.bridge = bridge
3333

    
3334
    #### allocator run
3335

    
3336
    if self.op.iallocator is not None:
3337
      self._RunAllocator()
3338

    
3339
    #### node related checks
3340

    
3341
    # check primary node
3342
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3343
    assert self.pnode is not None, \
3344
      "Cannot retrieve locked node %s" % self.op.pnode
3345
    self.secondaries = []
3346

    
3347
    # mirror node verification
3348
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3349
      if self.op.snode is None:
3350
        raise errors.OpPrereqError("The networked disk templates need"
3351
                                   " a mirror node")
3352
      if self.op.snode == pnode.name:
3353
        raise errors.OpPrereqError("The secondary node cannot be"
3354
                                   " the primary node.")
3355
      self.secondaries.append(self.op.snode)
3356

    
3357
    req_size = _ComputeDiskSize(self.op.disk_template,
3358
                                self.op.disk_size, self.op.swap_size)
3359

    
3360
    # Check lv size requirements
3361
    if req_size is not None:
3362
      nodenames = [pnode.name] + self.secondaries
3363
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3364
      for node in nodenames:
3365
        info = nodeinfo.get(node, None)
3366
        if not info:
3367
          raise errors.OpPrereqError("Cannot get current information"
3368
                                     " from node '%s'" % node)
3369
        vg_free = info.get('vg_free', None)
3370
        if not isinstance(vg_free, int):
3371
          raise errors.OpPrereqError("Can't compute free disk space on"
3372
                                     " node %s" % node)
3373
        if req_size > info['vg_free']:
3374
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3375
                                     " %d MB available, %d MB required" %
3376
                                     (node, info['vg_free'], req_size))
3377

    
3378
    # os verification
3379
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3380
    if not os_obj:
3381
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3382
                                 " primary node"  % self.op.os_type)
3383

    
3384
    if self.op.kernel_path == constants.VALUE_NONE:
3385
      raise errors.OpPrereqError("Can't set instance kernel to none")
3386

    
3387
    # bridge check on primary node
3388
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3389
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3390
                                 " destination node '%s'" %
3391
                                 (self.op.bridge, pnode.name))
3392

    
3393
    # memory check on primary node
3394
    if self.op.start:
3395
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3396
                           "creating instance %s" % self.op.instance_name,
3397
                           self.op.mem_size)
3398

    
3399
    # hvm_cdrom_image_path verification
3400
    if self.op.hvm_cdrom_image_path is not None:
3401
      # FIXME (als): shouldn't these checks happen on the destination node?
3402
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3403
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3404
                                   " be an absolute path or None, not %s" %
3405
                                   self.op.hvm_cdrom_image_path)
3406
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3407
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3408
                                   " regular file or a symlink pointing to"
3409
                                   " an existing regular file, not %s" %
3410
                                   self.op.hvm_cdrom_image_path)
3411

    
3412
    # vnc_bind_address verification
3413
    if self.op.vnc_bind_address is not None:
3414
      if not utils.IsValidIP(self.op.vnc_bind_address):
3415
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3416
                                   " like a valid IP address" %
3417
                                   self.op.vnc_bind_address)
3418

    
3419
    # Xen HVM device type checks
3420
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3421
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3422
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3423
                                   " hypervisor" % self.op.hvm_nic_type)
3424
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3425
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3426
                                   " hypervisor" % self.op.hvm_disk_type)
3427

    
3428
    if self.op.start:
3429
      self.instance_status = 'up'
3430
    else:
3431
      self.instance_status = 'down'
3432

    
3433
  def Exec(self, feedback_fn):
3434
    """Create and add the instance to the cluster.
3435

3436
    """
3437
    instance = self.op.instance_name
3438
    pnode_name = self.pnode.name
3439

    
3440
    if self.op.mac == "auto":
3441
      mac_address = self.cfg.GenerateMAC()
3442
    else:
3443
      mac_address = self.op.mac
3444

    
3445
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3446
    if self.inst_ip is not None:
3447
      nic.ip = self.inst_ip
3448

    
3449
    ht_kind = self.sstore.GetHypervisorType()
3450
    if ht_kind in constants.HTS_REQ_PORT:
3451
      network_port = self.cfg.AllocatePort()
3452
    else:
3453
      network_port = None
3454

    
3455
    if self.op.vnc_bind_address is None:
3456
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3457

    
3458
    # this is needed because os.path.join does not accept None arguments
3459
    if self.op.file_storage_dir is None:
3460
      string_file_storage_dir = ""
3461
    else:
3462
      string_file_storage_dir = self.op.file_storage_dir
3463

    
3464
    # build the full file storage dir path
3465
    file_storage_dir = os.path.normpath(os.path.join(
3466
                                        self.sstore.GetFileStorageDir(),
3467
                                        string_file_storage_dir, instance))
3468

    
3469

    
3470
    disks = _GenerateDiskTemplate(self.cfg,
3471
                                  self.op.disk_template,
3472
                                  instance, pnode_name,
3473
                                  self.secondaries, self.op.disk_size,
3474
                                  self.op.swap_size,
3475
                                  file_storage_dir,
3476
                                  self.op.file_driver)
3477

    
3478
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3479
                            primary_node=pnode_name,
3480
                            memory=self.op.mem_size,
3481
                            vcpus=self.op.vcpus,
3482
                            nics=[nic], disks=disks,
3483
                            disk_template=self.op.disk_template,
3484
                            status=self.instance_status,
3485
                            network_port=network_port,
3486
                            kernel_path=self.op.kernel_path,
3487
                            initrd_path=self.op.initrd_path,
3488
                            hvm_boot_order=self.op.hvm_boot_order,
3489
                            hvm_acpi=self.op.hvm_acpi,
3490
                            hvm_pae=self.op.hvm_pae,
3491
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3492
                            vnc_bind_address=self.op.vnc_bind_address,
3493
                            hvm_nic_type=self.op.hvm_nic_type,
3494
                            hvm_disk_type=self.op.hvm_disk_type,
3495
                            )
3496

    
3497
    feedback_fn("* creating instance disks...")
3498
    if not _CreateDisks(self.cfg, iobj):
3499
      _RemoveDisks(iobj, self.cfg)
3500
      raise errors.OpExecError("Device creation failed, reverting...")
3501

    
3502
    feedback_fn("adding instance %s to cluster config" % instance)
3503

    
3504
    self.cfg.AddInstance(iobj)
3505
    # Declare that we don't want to remove the instance lock anymore, as we've
3506
    # added the instance to the config
3507
    del self.remove_locks[locking.LEVEL_INSTANCE]
3508

    
3509
    if self.op.wait_for_sync:
3510
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3511
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3512
      # make sure the disks are not degraded (still sync-ing is ok)
3513
      time.sleep(15)
3514
      feedback_fn("* checking mirrors status")
3515
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3516
    else:
3517
      disk_abort = False
3518

    
3519
    if disk_abort:
3520
      _RemoveDisks(iobj, self.cfg)
3521
      self.cfg.RemoveInstance(iobj.name)
3522
      # Make sure the instance lock gets removed
3523
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3524
      raise errors.OpExecError("There are some degraded disks for"
3525
                               " this instance")
3526

    
3527
    feedback_fn("creating os for instance %s on node %s" %
3528
                (instance, pnode_name))
3529

    
3530
    if iobj.disk_template != constants.DT_DISKLESS:
3531
      if self.op.mode == constants.INSTANCE_CREATE:
3532
        feedback_fn("* running the instance OS create scripts...")
3533
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3534
          raise errors.OpExecError("could not add os for instance %s"
3535
                                   " on node %s" %
3536
                                   (instance, pnode_name))
3537

    
3538
      elif self.op.mode == constants.INSTANCE_IMPORT:
3539
        feedback_fn("* running the instance OS import scripts...")
3540
        src_node = self.op.src_node
3541
        src_image = self.src_image
3542
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3543
                                                src_node, src_image):
3544
          raise errors.OpExecError("Could not import os for instance"
3545
                                   " %s on node %s" %
3546
                                   (instance, pnode_name))
3547
      else:
3548
        # also checked in the prereq part
3549
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3550
                                     % self.op.mode)
3551

    
3552
    if self.op.start:
3553
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3554
      feedback_fn("* starting instance...")
3555
      if not rpc.call_instance_start(pnode_name, iobj, None):
3556
        raise errors.OpExecError("Could not start instance")
3557

    
3558

    
3559
class LUConnectConsole(NoHooksLU):
3560
  """Connect to an instance's console.
3561

3562
  This is somewhat special in that it returns the command line that
3563
  you need to run on the master node in order to connect to the
3564
  console.
3565

3566
  """
3567
  _OP_REQP = ["instance_name"]
3568
  REQ_BGL = False
3569

    
3570
  def ExpandNames(self):
3571
    self._ExpandAndLockInstance()
3572

    
3573
  def CheckPrereq(self):
3574
    """Check prerequisites.
3575

3576
    This checks that the instance is in the cluster.
3577

3578
    """
3579
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3580
    assert self.instance is not None, \
3581
      "Cannot retrieve locked instance %s" % self.op.instance_name
3582

    
3583
  def Exec(self, feedback_fn):
3584
    """Connect to the console of an instance
3585

3586
    """
3587
    instance = self.instance
3588
    node = instance.primary_node
3589

    
3590
    node_insts = rpc.call_instance_list([node])[node]
3591
    if node_insts is False:
3592
      raise errors.OpExecError("Can't connect to node %s." % node)
3593

    
3594
    if instance.name not in node_insts:
3595
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3596

    
3597
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3598

    
3599
    hyper = hypervisor.GetHypervisor()
3600
    console_cmd = hyper.GetShellCommandForConsole(instance)
3601

    
3602
    # build ssh cmdline
3603
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3604

    
3605

    
3606
class LUReplaceDisks(LogicalUnit):
3607
  """Replace the disks of an instance.
3608

3609
  """
3610
  HPATH = "mirrors-replace"
3611
  HTYPE = constants.HTYPE_INSTANCE
3612
  _OP_REQP = ["instance_name", "mode", "disks"]
3613
  REQ_BGL = False
3614

    
3615
  def ExpandNames(self):
3616
    self._ExpandAndLockInstance()
3617

    
3618
    if not hasattr(self.op, "remote_node"):
3619
      self.op.remote_node = None
3620

    
3621
    ia_name = getattr(self.op, "iallocator", None)
3622
    if ia_name is not None:
3623
      if self.op.remote_node is not None:
3624
        raise errors.OpPrereqError("Give either the iallocator or the new"
3625
                                   " secondary, not both")
3626
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3627
    elif self.op.remote_node is not None:
3628
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3629
      if remote_node is None:
3630
        raise errors.OpPrereqError("Node '%s' not known" %
3631
                                   self.op.remote_node)
3632
      self.op.remote_node = remote_node
3633
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3634
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3635
    else:
3636
      self.needed_locks[locking.LEVEL_NODE] = []
3637
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3638

    
3639
  def DeclareLocks(self, level):
3640
    # If we're not already locking all nodes in the set we have to declare the
3641
    # instance's primary/secondary nodes.
3642
    if (level == locking.LEVEL_NODE and
3643
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3644
      self._LockInstancesNodes()
3645

    
3646
  def _RunAllocator(self):
3647
    """Compute a new secondary node using an IAllocator.
3648

3649
    """
3650
    ial = IAllocator(self.cfg, self.sstore,
3651
                     mode=constants.IALLOCATOR_MODE_RELOC,
3652
                     name=self.op.instance_name,
3653
                     relocate_from=[self.sec_node])
3654

    
3655
    ial.Run(self.op.iallocator)
3656

    
3657
    if not ial.success:
3658
      raise errors.OpPrereqError("Can't compute nodes using"
3659
                                 " iallocator '%s': %s" % (self.op.iallocator,
3660
                                                           ial.info))
3661
    if len(ial.nodes) != ial.required_nodes:
3662
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3663
                                 " of nodes (%s), required %s" %
3664
                                 (len(ial.nodes), ial.required_nodes))
3665
    self.op.remote_node = ial.nodes[0]
3666
    logger.ToStdout("Selected new secondary for the instance: %s" %
3667
                    self.op.remote_node)
3668

    
3669
  def BuildHooksEnv(self):
3670
    """Build hooks env.
3671

3672
    This runs on the master, the primary and all the secondaries.
3673

3674
    """
3675
    env = {
3676
      "MODE": self.op.mode,
3677
      "NEW_SECONDARY": self.op.remote_node,
3678
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3679
      }
3680
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3681
    nl = [
3682
      self.sstore.GetMasterNode(),
3683
      self.instance.primary_node,
3684
      ]
3685
    if self.op.remote_node is not None:
3686
      nl.append(self.op.remote_node)
3687
    return env, nl, nl
3688

    
3689
  def CheckPrereq(self):
3690
    """Check prerequisites.
3691

3692
    This checks that the instance is in the cluster.
3693

3694
    """
3695
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3696
    assert instance is not None, \
3697
      "Cannot retrieve locked instance %s" % self.op.instance_name
3698
    self.instance = instance
3699

    
3700
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3701
      raise errors.OpPrereqError("Instance's disk layout is not"
3702
                                 " network mirrored.")
3703

    
3704
    if len(instance.secondary_nodes) != 1:
3705
      raise errors.OpPrereqError("The instance has a strange layout,"
3706
                                 " expected one secondary but found %d" %
3707
                                 len(instance.secondary_nodes))
3708

    
3709
    self.sec_node = instance.secondary_nodes[0]
3710

    
3711
    ia_name = getattr(self.op, "iallocator", None)
3712
    if ia_name is not None:
3713
      self._RunAllocator()
3714

    
3715
    remote_node = self.op.remote_node
3716
    if remote_node is not None:
3717
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3718
      assert self.remote_node_info is not None, \
3719
        "Cannot retrieve locked node %s" % remote_node
3720
    else:
3721
      self.remote_node_info = None
3722
    if remote_node == instance.primary_node:
3723
      raise errors.OpPrereqError("The specified node is the primary node of"
3724
                                 " the instance.")
3725
    elif remote_node == self.sec_node:
3726
      if self.op.mode == constants.REPLACE_DISK_SEC:
3727
        # this is for DRBD8, where we can't execute the same mode of
3728
        # replacement as for drbd7 (no different port allocated)
3729
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3730
                                   " replacement")
3731
    if instance.disk_template == constants.DT_DRBD8:
3732
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3733
          remote_node is not None):
3734
        # switch to replace secondary mode
3735
        self.op.mode = constants.REPLACE_DISK_SEC
3736

    
3737
      if self.op.mode == constants.REPLACE_DISK_ALL:
3738
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3739
                                   " secondary disk replacement, not"
3740
                                   " both at once")
3741
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3742
        if remote_node is not None:
3743
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3744
                                     " the secondary while doing a primary"
3745
                                     " node disk replacement")
3746
        self.tgt_node = instance.primary_node
3747
        self.oth_node = instance.secondary_nodes[0]
3748
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3749
        self.new_node = remote_node # this can be None, in which case
3750
                                    # we don't change the secondary
3751
        self.tgt_node = instance.secondary_nodes[0]
3752
        self.oth_node = instance.primary_node
3753
      else:
3754
        raise errors.ProgrammerError("Unhandled disk replace mode")
3755

    
3756
    for name in self.op.disks:
3757
      if instance.FindDisk(name) is None:
3758
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3759
                                   (name, instance.name))
3760

    
3761
  def _ExecD8DiskOnly(self, feedback_fn):
3762
    """Replace a disk on the primary or secondary for dbrd8.
3763

3764
    The algorithm for replace is quite complicated:
3765
      - for each disk to be replaced:
3766
        - create new LVs on the target node with unique names
3767
        - detach old LVs from the drbd device
3768
        - rename old LVs to name_replaced.<time_t>
3769
        - rename new LVs to old LVs
3770
        - attach the new LVs (with the old names now) to the drbd device
3771
      - wait for sync across all devices
3772
      - for each modified disk:
3773
        - remove old LVs (which have the name name_replaces.<time_t>)
3774

3775
    Failures are not very well handled.
3776

3777
    """
3778
    steps_total = 6
3779
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3780
    instance = self.instance
3781
    iv_names = {}
3782
    vgname = self.cfg.GetVGName()
3783
    # start of work
3784
    cfg = self.cfg
3785
    tgt_node = self.tgt_node
3786
    oth_node = self.oth_node
3787

    
3788
    # Step: check device activation
3789
    self.proc.LogStep(1, steps_total, "check device existence")
3790
    info("checking volume groups")
3791
    my_vg = cfg.GetVGName()
3792
    results = rpc.call_vg_list([oth_node, tgt_node])
3793
    if not results:
3794
      raise errors.OpExecError("Can't list volume groups on the nodes")
3795
    for node in oth_node, tgt_node:
3796
      res = results.get(node, False)
3797
      if not res or my_vg not in res:
3798
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3799
                                 (my_vg, node))
3800
    for dev in instance.disks:
3801
      if not dev.iv_name in self.op.disks:
3802
        continue
3803
      for node in tgt_node, oth_node:
3804
        info("checking %s on %s" % (dev.iv_name, node))
3805
        cfg.SetDiskID(dev, node)
3806
        if not rpc.call_blockdev_find(node, dev):
3807
          raise errors.OpExecError("Can't find device %s on node %s" %
3808
                                   (dev.iv_name, node))
3809

    
3810
    # Step: check other node consistency
3811
    self.proc.LogStep(2, steps_total, "check peer consistency")
3812
    for dev in instance.disks:
3813
      if not dev.iv_name in self.op.disks:
3814
        continue
3815
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3816
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3817
                                   oth_node==instance.primary_node):
3818
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3819
                                 " to replace disks on this node (%s)" %
3820
                                 (oth_node, tgt_node))
3821

    
3822
    # Step: create new storage
3823
    self.proc.LogStep(3, steps_total, "allocate new storage")
3824
    for dev in instance.disks:
3825
      if not dev.iv_name in self.op.disks:
3826
        continue
3827
      size = dev.size
3828
      cfg.SetDiskID(dev, tgt_node)
3829
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3830
      names = _GenerateUniqueNames(cfg, lv_names)
3831
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3832
                             logical_id=(vgname, names[0]))
3833
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3834
                             logical_id=(vgname, names[1]))
3835
      new_lvs = [lv_data, lv_meta]
3836
      old_lvs = dev.children
3837
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3838
      info("creating new local storage on %s for %s" %
3839
           (tgt_node, dev.iv_name))
3840
      # since we *always* want to create this LV, we use the
3841
      # _Create...OnPrimary (which forces the creation), even if we
3842
      # are talking about the secondary node
3843
      for new_lv in new_lvs:
3844
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3845
                                        _GetInstanceInfoText(instance)):
3846
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3847
                                   " node '%s'" %
3848
                                   (new_lv.logical_id[1], tgt_node))
3849

    
3850
    # Step: for each lv, detach+rename*2+attach
3851
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3852
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3853
      info("detaching %s drbd from local storage" % dev.iv_name)
3854
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3855
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3856
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3857
      #dev.children = []
3858
      #cfg.Update(instance)
3859

    
3860
      # ok, we created the new LVs, so now we know we have the needed
3861
      # storage; as such, we proceed on the target node to rename
3862
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3863
      # using the assumption that logical_id == physical_id (which in
3864
      # turn is the unique_id on that node)
3865

    
3866
      # FIXME(iustin): use a better name for the replaced LVs
3867
      temp_suffix = int(time.time())
3868
      ren_fn = lambda d, suff: (d.physical_id[0],
3869
                                d.physical_id[1] + "_replaced-%s" % suff)
3870
      # build the rename list based on what LVs exist on the node
3871
      rlist = []
3872
      for to_ren in old_lvs:
3873
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3874
        if find_res is not None: # device exists
3875
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3876

    
3877
      info("renaming the old LVs on the target node")
3878
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3879
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3880
      # now we rename the new LVs to the old LVs
3881
      info("renaming the new LVs on the target node")
3882
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3883
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3884
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3885

    
3886
      for old, new in zip(old_lvs, new_lvs):
3887
        new.logical_id = old.logical_id
3888
        cfg.SetDiskID(new, tgt_node)
3889

    
3890
      for disk in old_lvs:
3891
        disk.logical_id = ren_fn(disk, temp_suffix)
3892
        cfg.SetDiskID(disk, tgt_node)
3893

    
3894
      # now that the new lvs have the old name, we can add them to the device
3895
      info("adding new mirror component on %s" % tgt_node)
3896
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3897
        for new_lv in new_lvs:
3898
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3899
            warning("Can't rollback device %s", hint="manually cleanup unused"
3900
                    " logical volumes")
3901
        raise errors.OpExecError("Can't add local storage to drbd")
3902

    
3903
      dev.children = new_lvs
3904
      cfg.Update(instance)
3905

    
3906
    # Step: wait for sync
3907

    
3908
    # this can fail as the old devices are degraded and _WaitForSync
3909
    # does a combined result over all disks, so we don't check its
3910
    # return value
3911
    self.proc.LogStep(5, steps_total, "sync devices")
3912
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3913

    
3914
    # so check manually all the devices
3915
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3916
      cfg.SetDiskID(dev, instance.primary_node)
3917
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3918
      if is_degr:
3919
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3920

    
3921
    # Step: remove old storage
3922
    self.proc.LogStep(6, steps_total, "removing old storage")
3923
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3924
      info("remove logical volumes for %s" % name)
3925
      for lv in old_lvs:
3926
        cfg.SetDiskID(lv, tgt_node)
3927
        if not rpc.call_blockdev_remove(tgt_node, lv):
3928
          warning("Can't remove old LV", hint="manually remove unused LVs")
3929
          continue
3930

    
3931
  def _ExecD8Secondary(self, feedback_fn):
3932
    """Replace the secondary node for drbd8.
3933

3934
    The algorithm for replace is quite complicated:
3935
      - for all disks of the instance:
3936
        - create new LVs on the new node with same names
3937
        - shutdown the drbd device on the old secondary
3938
        - disconnect the drbd network on the primary
3939
        - create the drbd device on the new secondary
3940
        - network attach the drbd on the primary, using an artifice:
3941
          the drbd code for Attach() will connect to the network if it
3942
          finds a device which is connected to the good local disks but
3943
          not network enabled
3944
      - wait for sync across all devices
3945
      - remove all disks from the old secondary
3946

3947
    Failures are not very well handled.
3948

3949
    """
3950
    steps_total = 6
3951
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3952
    instance = self.instance
3953
    iv_names = {}
3954
    vgname = self.cfg.GetVGName()
3955
    # start of work
3956
    cfg = self.cfg
3957
    old_node = self.tgt_node