Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b91a34a5

History | View | Annotate | Download (179.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use None as a value
130
        (this reflects what LockSet does, and will be replaced before
131
        CheckPrereq with the full list of nodes that have been locked)
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: None,
141
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    """
286
    assert locking.LEVEL_NODE in self.recalculate_locks, \
287
      "_LockInstancesNodes helper function called with no nodes to recalculate"
288

    
289
    # TODO: check if we're really been called with the instance locks held
290

    
291
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
292
    # future we might want to have different behaviors depending on the value
293
    # of self.recalculate_locks[locking.LEVEL_NODE]
294
    wanted_nodes = []
295
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
296
      instance = self.context.cfg.GetInstanceInfo(instance_name)
297
      wanted_nodes.append(instance.primary_node)
298
      wanted_nodes.extend(instance.secondary_nodes)
299
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
300

    
301
    del self.recalculate_locks[locking.LEVEL_NODE]
302

    
303

    
304
class NoHooksLU(LogicalUnit):
305
  """Simple LU which runs no hooks.
306

307
  This LU is intended as a parent for other LogicalUnits which will
308
  run no hooks, in order to reduce duplicate code.
309

310
  """
311
  HPATH = None
312
  HTYPE = None
313

    
314

    
315
def _GetWantedNodes(lu, nodes):
316
  """Returns list of checked and expanded node names.
317

318
  Args:
319
    nodes: List of nodes (strings) or None for all
320

321
  """
322
  if not isinstance(nodes, list):
323
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
324

    
325
  if nodes:
326
    wanted = []
327

    
328
    for name in nodes:
329
      node = lu.cfg.ExpandNodeName(name)
330
      if node is None:
331
        raise errors.OpPrereqError("No such node name '%s'" % name)
332
      wanted.append(node)
333

    
334
  else:
335
    wanted = lu.cfg.GetNodeList()
336
  return utils.NiceSort(wanted)
337

    
338

    
339
def _GetWantedInstances(lu, instances):
340
  """Returns list of checked and expanded instance names.
341

342
  Args:
343
    instances: List of instances (strings) or None for all
344

345
  """
346
  if not isinstance(instances, list):
347
    raise errors.OpPrereqError("Invalid argument type 'instances'")
348

    
349
  if instances:
350
    wanted = []
351

    
352
    for name in instances:
353
      instance = lu.cfg.ExpandInstanceName(name)
354
      if instance is None:
355
        raise errors.OpPrereqError("No such instance name '%s'" % name)
356
      wanted.append(instance)
357

    
358
  else:
359
    wanted = lu.cfg.GetInstanceList()
360
  return utils.NiceSort(wanted)
361

    
362

    
363
def _CheckOutputFields(static, dynamic, selected):
364
  """Checks whether all selected fields are valid.
365

366
  Args:
367
    static: Static fields
368
    dynamic: Dynamic fields
369

370
  """
371
  static_fields = frozenset(static)
372
  dynamic_fields = frozenset(dynamic)
373

    
374
  all_fields = static_fields | dynamic_fields
375

    
376
  if not all_fields.issuperset(selected):
377
    raise errors.OpPrereqError("Unknown output fields selected: %s"
378
                               % ",".join(frozenset(selected).
379
                                          difference(all_fields)))
380

    
381

    
382
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
383
                          memory, vcpus, nics):
384
  """Builds instance related env variables for hooks from single variables.
385

386
  Args:
387
    secondary_nodes: List of secondary nodes as strings
388
  """
389
  env = {
390
    "OP_TARGET": name,
391
    "INSTANCE_NAME": name,
392
    "INSTANCE_PRIMARY": primary_node,
393
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
394
    "INSTANCE_OS_TYPE": os_type,
395
    "INSTANCE_STATUS": status,
396
    "INSTANCE_MEMORY": memory,
397
    "INSTANCE_VCPUS": vcpus,
398
  }
399

    
400
  if nics:
401
    nic_count = len(nics)
402
    for idx, (ip, bridge, mac) in enumerate(nics):
403
      if ip is None:
404
        ip = ""
405
      env["INSTANCE_NIC%d_IP" % idx] = ip
406
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
407
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
408
  else:
409
    nic_count = 0
410

    
411
  env["INSTANCE_NIC_COUNT"] = nic_count
412

    
413
  return env
414

    
415

    
416
def _BuildInstanceHookEnvByObject(instance, override=None):
417
  """Builds instance related env variables for hooks from an object.
418

419
  Args:
420
    instance: objects.Instance object of instance
421
    override: dict of values to override
422
  """
423
  args = {
424
    'name': instance.name,
425
    'primary_node': instance.primary_node,
426
    'secondary_nodes': instance.secondary_nodes,
427
    'os_type': instance.os,
428
    'status': instance.os,
429
    'memory': instance.memory,
430
    'vcpus': instance.vcpus,
431
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
432
  }
433
  if override:
434
    args.update(override)
435
  return _BuildInstanceHookEnv(**args)
436

    
437

    
438
def _CheckInstanceBridgesExist(instance):
439
  """Check that the brigdes needed by an instance exist.
440

441
  """
442
  # check bridges existance
443
  brlist = [nic.bridge for nic in instance.nics]
444
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
445
    raise errors.OpPrereqError("one or more target bridges %s does not"
446
                               " exist on destination node '%s'" %
447
                               (brlist, instance.primary_node))
448

    
449

    
450
class LUDestroyCluster(NoHooksLU):
451
  """Logical unit for destroying the cluster.
452

453
  """
454
  _OP_REQP = []
455

    
456
  def CheckPrereq(self):
457
    """Check prerequisites.
458

459
    This checks whether the cluster is empty.
460

461
    Any errors are signalled by raising errors.OpPrereqError.
462

463
    """
464
    master = self.sstore.GetMasterNode()
465

    
466
    nodelist = self.cfg.GetNodeList()
467
    if len(nodelist) != 1 or nodelist[0] != master:
468
      raise errors.OpPrereqError("There are still %d node(s) in"
469
                                 " this cluster." % (len(nodelist) - 1))
470
    instancelist = self.cfg.GetInstanceList()
471
    if instancelist:
472
      raise errors.OpPrereqError("There are still %d instance(s) in"
473
                                 " this cluster." % len(instancelist))
474

    
475
  def Exec(self, feedback_fn):
476
    """Destroys the cluster.
477

478
    """
479
    master = self.sstore.GetMasterNode()
480
    if not rpc.call_node_stop_master(master, False):
481
      raise errors.OpExecError("Could not disable the master role")
482
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
483
    utils.CreateBackup(priv_key)
484
    utils.CreateBackup(pub_key)
485
    return master
486

    
487

    
488
class LUVerifyCluster(LogicalUnit):
489
  """Verifies the cluster status.
490

491
  """
492
  HPATH = "cluster-verify"
493
  HTYPE = constants.HTYPE_CLUSTER
494
  _OP_REQP = ["skip_checks"]
495

    
496
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
497
                  remote_version, feedback_fn):
498
    """Run multiple tests against a node.
499

500
    Test list:
501
      - compares ganeti version
502
      - checks vg existance and size > 20G
503
      - checks config file checksum
504
      - checks ssh to other nodes
505

506
    Args:
507
      node: name of the node to check
508
      file_list: required list of files
509
      local_cksum: dictionary of local files and their checksums
510

511
    """
512
    # compares ganeti version
513
    local_version = constants.PROTOCOL_VERSION
514
    if not remote_version:
515
      feedback_fn("  - ERROR: connection to %s failed" % (node))
516
      return True
517

    
518
    if local_version != remote_version:
519
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
520
                      (local_version, node, remote_version))
521
      return True
522

    
523
    # checks vg existance and size > 20G
524

    
525
    bad = False
526
    if not vglist:
527
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
528
                      (node,))
529
      bad = True
530
    else:
531
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
532
                                            constants.MIN_VG_SIZE)
533
      if vgstatus:
534
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
535
        bad = True
536

    
537
    # checks config file checksum
538
    # checks ssh to any
539

    
540
    if 'filelist' not in node_result:
541
      bad = True
542
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
543
    else:
544
      remote_cksum = node_result['filelist']
545
      for file_name in file_list:
546
        if file_name not in remote_cksum:
547
          bad = True
548
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
549
        elif remote_cksum[file_name] != local_cksum[file_name]:
550
          bad = True
551
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
552

    
553
    if 'nodelist' not in node_result:
554
      bad = True
555
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
556
    else:
557
      if node_result['nodelist']:
558
        bad = True
559
        for node in node_result['nodelist']:
560
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
561
                          (node, node_result['nodelist'][node]))
562
    if 'node-net-test' not in node_result:
563
      bad = True
564
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
565
    else:
566
      if node_result['node-net-test']:
567
        bad = True
568
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
569
        for node in nlist:
570
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
571
                          (node, node_result['node-net-test'][node]))
572

    
573
    hyp_result = node_result.get('hypervisor', None)
574
    if hyp_result is not None:
575
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
576
    return bad
577

    
578
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
579
                      node_instance, feedback_fn):
580
    """Verify an instance.
581

582
    This function checks to see if the required block devices are
583
    available on the instance's node.
584

585
    """
586
    bad = False
587

    
588
    node_current = instanceconfig.primary_node
589

    
590
    node_vol_should = {}
591
    instanceconfig.MapLVsByNode(node_vol_should)
592

    
593
    for node in node_vol_should:
594
      for volume in node_vol_should[node]:
595
        if node not in node_vol_is or volume not in node_vol_is[node]:
596
          feedback_fn("  - ERROR: volume %s missing on node %s" %
597
                          (volume, node))
598
          bad = True
599

    
600
    if not instanceconfig.status == 'down':
601
      if (node_current not in node_instance or
602
          not instance in node_instance[node_current]):
603
        feedback_fn("  - ERROR: instance %s not running on node %s" %
604
                        (instance, node_current))
605
        bad = True
606

    
607
    for node in node_instance:
608
      if (not node == node_current):
609
        if instance in node_instance[node]:
610
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
611
                          (instance, node))
612
          bad = True
613

    
614
    return bad
615

    
616
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
617
    """Verify if there are any unknown volumes in the cluster.
618

619
    The .os, .swap and backup volumes are ignored. All other volumes are
620
    reported as unknown.
621

622
    """
623
    bad = False
624

    
625
    for node in node_vol_is:
626
      for volume in node_vol_is[node]:
627
        if node not in node_vol_should or volume not in node_vol_should[node]:
628
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
629
                      (volume, node))
630
          bad = True
631
    return bad
632

    
633
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
634
    """Verify the list of running instances.
635

636
    This checks what instances are running but unknown to the cluster.
637

638
    """
639
    bad = False
640
    for node in node_instance:
641
      for runninginstance in node_instance[node]:
642
        if runninginstance not in instancelist:
643
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
644
                          (runninginstance, node))
645
          bad = True
646
    return bad
647

    
648
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
649
    """Verify N+1 Memory Resilience.
650

651
    Check that if one single node dies we can still start all the instances it
652
    was primary for.
653

654
    """
655
    bad = False
656

    
657
    for node, nodeinfo in node_info.iteritems():
658
      # This code checks that every node which is now listed as secondary has
659
      # enough memory to host all instances it is supposed to should a single
660
      # other node in the cluster fail.
661
      # FIXME: not ready for failover to an arbitrary node
662
      # FIXME: does not support file-backed instances
663
      # WARNING: we currently take into account down instances as well as up
664
      # ones, considering that even if they're down someone might want to start
665
      # them even in the event of a node failure.
666
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
667
        needed_mem = 0
668
        for instance in instances:
669
          needed_mem += instance_cfg[instance].memory
670
        if nodeinfo['mfree'] < needed_mem:
671
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
672
                      " failovers should node %s fail" % (node, prinode))
673
          bad = True
674
    return bad
675

    
676
  def CheckPrereq(self):
677
    """Check prerequisites.
678

679
    Transform the list of checks we're going to skip into a set and check that
680
    all its members are valid.
681

682
    """
683
    self.skip_set = frozenset(self.op.skip_checks)
684
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
685
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
686

    
687
  def BuildHooksEnv(self):
688
    """Build hooks env.
689

690
    Cluster-Verify hooks just rone in the post phase and their failure makes
691
    the output be logged in the verify output and the verification to fail.
692

693
    """
694
    all_nodes = self.cfg.GetNodeList()
695
    # TODO: populate the environment with useful information for verify hooks
696
    env = {}
697
    return env, [], all_nodes
698

    
699
  def Exec(self, feedback_fn):
700
    """Verify integrity of cluster, performing various test on nodes.
701

702
    """
703
    bad = False
704
    feedback_fn("* Verifying global settings")
705
    for msg in self.cfg.VerifyConfig():
706
      feedback_fn("  - ERROR: %s" % msg)
707

    
708
    vg_name = self.cfg.GetVGName()
709
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
710
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
711
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
712
    i_non_redundant = [] # Non redundant instances
713
    node_volume = {}
714
    node_instance = {}
715
    node_info = {}
716
    instance_cfg = {}
717

    
718
    # FIXME: verify OS list
719
    # do local checksums
720
    file_names = list(self.sstore.GetFileList())
721
    file_names.append(constants.SSL_CERT_FILE)
722
    file_names.append(constants.CLUSTER_CONF_FILE)
723
    local_checksums = utils.FingerprintFiles(file_names)
724

    
725
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
726
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
727
    all_instanceinfo = rpc.call_instance_list(nodelist)
728
    all_vglist = rpc.call_vg_list(nodelist)
729
    node_verify_param = {
730
      'filelist': file_names,
731
      'nodelist': nodelist,
732
      'hypervisor': None,
733
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
734
                        for node in nodeinfo]
735
      }
736
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
737
    all_rversion = rpc.call_version(nodelist)
738
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
739

    
740
    for node in nodelist:
741
      feedback_fn("* Verifying node %s" % node)
742
      result = self._VerifyNode(node, file_names, local_checksums,
743
                                all_vglist[node], all_nvinfo[node],
744
                                all_rversion[node], feedback_fn)
745
      bad = bad or result
746

    
747
      # node_volume
748
      volumeinfo = all_volumeinfo[node]
749

    
750
      if isinstance(volumeinfo, basestring):
751
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
752
                    (node, volumeinfo[-400:].encode('string_escape')))
753
        bad = True
754
        node_volume[node] = {}
755
      elif not isinstance(volumeinfo, dict):
756
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
757
        bad = True
758
        continue
759
      else:
760
        node_volume[node] = volumeinfo
761

    
762
      # node_instance
763
      nodeinstance = all_instanceinfo[node]
764
      if type(nodeinstance) != list:
765
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
766
        bad = True
767
        continue
768

    
769
      node_instance[node] = nodeinstance
770

    
771
      # node_info
772
      nodeinfo = all_ninfo[node]
773
      if not isinstance(nodeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777

    
778
      try:
779
        node_info[node] = {
780
          "mfree": int(nodeinfo['memory_free']),
781
          "dfree": int(nodeinfo['vg_free']),
782
          "pinst": [],
783
          "sinst": [],
784
          # dictionary holding all instances this node is secondary for,
785
          # grouped by their primary node. Each key is a cluster node, and each
786
          # value is a list of instances which have the key as primary and the
787
          # current node as secondary.  this is handy to calculate N+1 memory
788
          # availability if you can only failover from a primary to its
789
          # secondary.
790
          "sinst-by-pnode": {},
791
        }
792
      except ValueError:
793
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
794
        bad = True
795
        continue
796

    
797
    node_vol_should = {}
798

    
799
    for instance in instancelist:
800
      feedback_fn("* Verifying instance %s" % instance)
801
      inst_config = self.cfg.GetInstanceInfo(instance)
802
      result =  self._VerifyInstance(instance, inst_config, node_volume,
803
                                     node_instance, feedback_fn)
804
      bad = bad or result
805

    
806
      inst_config.MapLVsByNode(node_vol_should)
807

    
808
      instance_cfg[instance] = inst_config
809

    
810
      pnode = inst_config.primary_node
811
      if pnode in node_info:
812
        node_info[pnode]['pinst'].append(instance)
813
      else:
814
        feedback_fn("  - ERROR: instance %s, connection to primary node"
815
                    " %s failed" % (instance, pnode))
816
        bad = True
817

    
818
      # If the instance is non-redundant we cannot survive losing its primary
819
      # node, so we are not N+1 compliant. On the other hand we have no disk
820
      # templates with more than one secondary so that situation is not well
821
      # supported either.
822
      # FIXME: does not support file-backed instances
823
      if len(inst_config.secondary_nodes) == 0:
824
        i_non_redundant.append(instance)
825
      elif len(inst_config.secondary_nodes) > 1:
826
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
827
                    % instance)
828

    
829
      for snode in inst_config.secondary_nodes:
830
        if snode in node_info:
831
          node_info[snode]['sinst'].append(instance)
832
          if pnode not in node_info[snode]['sinst-by-pnode']:
833
            node_info[snode]['sinst-by-pnode'][pnode] = []
834
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
835
        else:
836
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
837
                      " %s failed" % (instance, snode))
838

    
839
    feedback_fn("* Verifying orphan volumes")
840
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
841
                                       feedback_fn)
842
    bad = bad or result
843

    
844
    feedback_fn("* Verifying remaining instances")
845
    result = self._VerifyOrphanInstances(instancelist, node_instance,
846
                                         feedback_fn)
847
    bad = bad or result
848

    
849
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
850
      feedback_fn("* Verifying N+1 Memory redundancy")
851
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
852
      bad = bad or result
853

    
854
    feedback_fn("* Other Notes")
855
    if i_non_redundant:
856
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
857
                  % len(i_non_redundant))
858

    
859
    return not bad
860

    
861
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
862
    """Analize the post-hooks' result, handle it, and send some
863
    nicely-formatted feedback back to the user.
864

865
    Args:
866
      phase: the hooks phase that has just been run
867
      hooks_results: the results of the multi-node hooks rpc call
868
      feedback_fn: function to send feedback back to the caller
869
      lu_result: previous Exec result
870

871
    """
872
    # We only really run POST phase hooks, and are only interested in
873
    # their results
874
    if phase == constants.HOOKS_PHASE_POST:
875
      # Used to change hooks' output to proper indentation
876
      indent_re = re.compile('^', re.M)
877
      feedback_fn("* Hooks Results")
878
      if not hooks_results:
879
        feedback_fn("  - ERROR: general communication failure")
880
        lu_result = 1
881
      else:
882
        for node_name in hooks_results:
883
          show_node_header = True
884
          res = hooks_results[node_name]
885
          if res is False or not isinstance(res, list):
886
            feedback_fn("    Communication failure")
887
            lu_result = 1
888
            continue
889
          for script, hkr, output in res:
890
            if hkr == constants.HKR_FAIL:
891
              # The node header is only shown once, if there are
892
              # failing hooks on that node
893
              if show_node_header:
894
                feedback_fn("  Node %s:" % node_name)
895
                show_node_header = False
896
              feedback_fn("    ERROR: Script %s failed, output:" % script)
897
              output = indent_re.sub('      ', output)
898
              feedback_fn("%s" % output)
899
              lu_result = 1
900

    
901
      return lu_result
902

    
903

    
904
class LUVerifyDisks(NoHooksLU):
905
  """Verifies the cluster disks status.
906

907
  """
908
  _OP_REQP = []
909

    
910
  def CheckPrereq(self):
911
    """Check prerequisites.
912

913
    This has no prerequisites.
914

915
    """
916
    pass
917

    
918
  def Exec(self, feedback_fn):
919
    """Verify integrity of cluster disks.
920

921
    """
922
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
923

    
924
    vg_name = self.cfg.GetVGName()
925
    nodes = utils.NiceSort(self.cfg.GetNodeList())
926
    instances = [self.cfg.GetInstanceInfo(name)
927
                 for name in self.cfg.GetInstanceList()]
928

    
929
    nv_dict = {}
930
    for inst in instances:
931
      inst_lvs = {}
932
      if (inst.status != "up" or
933
          inst.disk_template not in constants.DTS_NET_MIRROR):
934
        continue
935
      inst.MapLVsByNode(inst_lvs)
936
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
937
      for node, vol_list in inst_lvs.iteritems():
938
        for vol in vol_list:
939
          nv_dict[(node, vol)] = inst
940

    
941
    if not nv_dict:
942
      return result
943

    
944
    node_lvs = rpc.call_volume_list(nodes, vg_name)
945

    
946
    to_act = set()
947
    for node in nodes:
948
      # node_volume
949
      lvs = node_lvs[node]
950

    
951
      if isinstance(lvs, basestring):
952
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
953
        res_nlvm[node] = lvs
954
      elif not isinstance(lvs, dict):
955
        logger.Info("connection to node %s failed or invalid data returned" %
956
                    (node,))
957
        res_nodes.append(node)
958
        continue
959

    
960
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
961
        inst = nv_dict.pop((node, lv_name), None)
962
        if (not lv_online and inst is not None
963
            and inst.name not in res_instances):
964
          res_instances.append(inst.name)
965

    
966
    # any leftover items in nv_dict are missing LVs, let's arrange the
967
    # data better
968
    for key, inst in nv_dict.iteritems():
969
      if inst.name not in res_missing:
970
        res_missing[inst.name] = []
971
      res_missing[inst.name].append(key)
972

    
973
    return result
974

    
975

    
976
class LURenameCluster(LogicalUnit):
977
  """Rename the cluster.
978

979
  """
980
  HPATH = "cluster-rename"
981
  HTYPE = constants.HTYPE_CLUSTER
982
  _OP_REQP = ["name"]
983
  REQ_WSSTORE = True
984

    
985
  def BuildHooksEnv(self):
986
    """Build hooks env.
987

988
    """
989
    env = {
990
      "OP_TARGET": self.sstore.GetClusterName(),
991
      "NEW_NAME": self.op.name,
992
      }
993
    mn = self.sstore.GetMasterNode()
994
    return env, [mn], [mn]
995

    
996
  def CheckPrereq(self):
997
    """Verify that the passed name is a valid one.
998

999
    """
1000
    hostname = utils.HostInfo(self.op.name)
1001

    
1002
    new_name = hostname.name
1003
    self.ip = new_ip = hostname.ip
1004
    old_name = self.sstore.GetClusterName()
1005
    old_ip = self.sstore.GetMasterIP()
1006
    if new_name == old_name and new_ip == old_ip:
1007
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1008
                                 " cluster has changed")
1009
    if new_ip != old_ip:
1010
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1011
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1012
                                   " reachable on the network. Aborting." %
1013
                                   new_ip)
1014

    
1015
    self.op.name = new_name
1016

    
1017
  def Exec(self, feedback_fn):
1018
    """Rename the cluster.
1019

1020
    """
1021
    clustername = self.op.name
1022
    ip = self.ip
1023
    ss = self.sstore
1024

    
1025
    # shutdown the master IP
1026
    master = ss.GetMasterNode()
1027
    if not rpc.call_node_stop_master(master, False):
1028
      raise errors.OpExecError("Could not disable the master role")
1029

    
1030
    try:
1031
      # modify the sstore
1032
      ss.SetKey(ss.SS_MASTER_IP, ip)
1033
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1034

    
1035
      # Distribute updated ss config to all nodes
1036
      myself = self.cfg.GetNodeInfo(master)
1037
      dist_nodes = self.cfg.GetNodeList()
1038
      if myself.name in dist_nodes:
1039
        dist_nodes.remove(myself.name)
1040

    
1041
      logger.Debug("Copying updated ssconf data to all nodes")
1042
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1043
        fname = ss.KeyToFilename(keyname)
1044
        result = rpc.call_upload_file(dist_nodes, fname)
1045
        for to_node in dist_nodes:
1046
          if not result[to_node]:
1047
            logger.Error("copy of file %s to node %s failed" %
1048
                         (fname, to_node))
1049
    finally:
1050
      if not rpc.call_node_start_master(master, False):
1051
        logger.Error("Could not re-enable the master role on the master,"
1052
                     " please restart manually.")
1053

    
1054

    
1055
def _RecursiveCheckIfLVMBased(disk):
1056
  """Check if the given disk or its children are lvm-based.
1057

1058
  Args:
1059
    disk: ganeti.objects.Disk object
1060

1061
  Returns:
1062
    boolean indicating whether a LD_LV dev_type was found or not
1063

1064
  """
1065
  if disk.children:
1066
    for chdisk in disk.children:
1067
      if _RecursiveCheckIfLVMBased(chdisk):
1068
        return True
1069
  return disk.dev_type == constants.LD_LV
1070

    
1071

    
1072
class LUSetClusterParams(LogicalUnit):
1073
  """Change the parameters of the cluster.
1074

1075
  """
1076
  HPATH = "cluster-modify"
1077
  HTYPE = constants.HTYPE_CLUSTER
1078
  _OP_REQP = []
1079

    
1080
  def BuildHooksEnv(self):
1081
    """Build hooks env.
1082

1083
    """
1084
    env = {
1085
      "OP_TARGET": self.sstore.GetClusterName(),
1086
      "NEW_VG_NAME": self.op.vg_name,
1087
      }
1088
    mn = self.sstore.GetMasterNode()
1089
    return env, [mn], [mn]
1090

    
1091
  def CheckPrereq(self):
1092
    """Check prerequisites.
1093

1094
    This checks whether the given params don't conflict and
1095
    if the given volume group is valid.
1096

1097
    """
1098
    if not self.op.vg_name:
1099
      instances = [self.cfg.GetInstanceInfo(name)
1100
                   for name in self.cfg.GetInstanceList()]
1101
      for inst in instances:
1102
        for disk in inst.disks:
1103
          if _RecursiveCheckIfLVMBased(disk):
1104
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1105
                                       " lvm-based instances exist")
1106

    
1107
    # if vg_name not None, checks given volume group on all nodes
1108
    if self.op.vg_name:
1109
      node_list = self.cfg.GetNodeList()
1110
      vglist = rpc.call_vg_list(node_list)
1111
      for node in node_list:
1112
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1113
                                              constants.MIN_VG_SIZE)
1114
        if vgstatus:
1115
          raise errors.OpPrereqError("Error on node '%s': %s" %
1116
                                     (node, vgstatus))
1117

    
1118
  def Exec(self, feedback_fn):
1119
    """Change the parameters of the cluster.
1120

1121
    """
1122
    if self.op.vg_name != self.cfg.GetVGName():
1123
      self.cfg.SetVGName(self.op.vg_name)
1124
    else:
1125
      feedback_fn("Cluster LVM configuration already in desired"
1126
                  " state, not changing")
1127

    
1128

    
1129
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1130
  """Sleep and poll for an instance's disk to sync.
1131

1132
  """
1133
  if not instance.disks:
1134
    return True
1135

    
1136
  if not oneshot:
1137
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1138

    
1139
  node = instance.primary_node
1140

    
1141
  for dev in instance.disks:
1142
    cfgw.SetDiskID(dev, node)
1143

    
1144
  retries = 0
1145
  while True:
1146
    max_time = 0
1147
    done = True
1148
    cumul_degraded = False
1149
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1150
    if not rstats:
1151
      proc.LogWarning("Can't get any data from node %s" % node)
1152
      retries += 1
1153
      if retries >= 10:
1154
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1155
                                 " aborting." % node)
1156
      time.sleep(6)
1157
      continue
1158
    retries = 0
1159
    for i in range(len(rstats)):
1160
      mstat = rstats[i]
1161
      if mstat is None:
1162
        proc.LogWarning("Can't compute data for node %s/%s" %
1163
                        (node, instance.disks[i].iv_name))
1164
        continue
1165
      # we ignore the ldisk parameter
1166
      perc_done, est_time, is_degraded, _ = mstat
1167
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1168
      if perc_done is not None:
1169
        done = False
1170
        if est_time is not None:
1171
          rem_time = "%d estimated seconds remaining" % est_time
1172
          max_time = est_time
1173
        else:
1174
          rem_time = "no time estimate"
1175
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1176
                     (instance.disks[i].iv_name, perc_done, rem_time))
1177
    if done or oneshot:
1178
      break
1179

    
1180
    time.sleep(min(60, max_time))
1181

    
1182
  if done:
1183
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1184
  return not cumul_degraded
1185

    
1186

    
1187
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1188
  """Check that mirrors are not degraded.
1189

1190
  The ldisk parameter, if True, will change the test from the
1191
  is_degraded attribute (which represents overall non-ok status for
1192
  the device(s)) to the ldisk (representing the local storage status).
1193

1194
  """
1195
  cfgw.SetDiskID(dev, node)
1196
  if ldisk:
1197
    idx = 6
1198
  else:
1199
    idx = 5
1200

    
1201
  result = True
1202
  if on_primary or dev.AssembleOnSecondary():
1203
    rstats = rpc.call_blockdev_find(node, dev)
1204
    if not rstats:
1205
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1206
      result = False
1207
    else:
1208
      result = result and (not rstats[idx])
1209
  if dev.children:
1210
    for child in dev.children:
1211
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1212

    
1213
  return result
1214

    
1215

    
1216
class LUDiagnoseOS(NoHooksLU):
1217
  """Logical unit for OS diagnose/query.
1218

1219
  """
1220
  _OP_REQP = ["output_fields", "names"]
1221

    
1222
  def CheckPrereq(self):
1223
    """Check prerequisites.
1224

1225
    This always succeeds, since this is a pure query LU.
1226

1227
    """
1228
    if self.op.names:
1229
      raise errors.OpPrereqError("Selective OS query not supported")
1230

    
1231
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1232
    _CheckOutputFields(static=[],
1233
                       dynamic=self.dynamic_fields,
1234
                       selected=self.op.output_fields)
1235

    
1236
  @staticmethod
1237
  def _DiagnoseByOS(node_list, rlist):
1238
    """Remaps a per-node return list into an a per-os per-node dictionary
1239

1240
      Args:
1241
        node_list: a list with the names of all nodes
1242
        rlist: a map with node names as keys and OS objects as values
1243

1244
      Returns:
1245
        map: a map with osnames as keys and as value another map, with
1246
             nodes as
1247
             keys and list of OS objects as values
1248
             e.g. {"debian-etch": {"node1": [<object>,...],
1249
                                   "node2": [<object>,]}
1250
                  }
1251

1252
    """
1253
    all_os = {}
1254
    for node_name, nr in rlist.iteritems():
1255
      if not nr:
1256
        continue
1257
      for os_obj in nr:
1258
        if os_obj.name not in all_os:
1259
          # build a list of nodes for this os containing empty lists
1260
          # for each node in node_list
1261
          all_os[os_obj.name] = {}
1262
          for nname in node_list:
1263
            all_os[os_obj.name][nname] = []
1264
        all_os[os_obj.name][node_name].append(os_obj)
1265
    return all_os
1266

    
1267
  def Exec(self, feedback_fn):
1268
    """Compute the list of OSes.
1269

1270
    """
1271
    node_list = self.cfg.GetNodeList()
1272
    node_data = rpc.call_os_diagnose(node_list)
1273
    if node_data == False:
1274
      raise errors.OpExecError("Can't gather the list of OSes")
1275
    pol = self._DiagnoseByOS(node_list, node_data)
1276
    output = []
1277
    for os_name, os_data in pol.iteritems():
1278
      row = []
1279
      for field in self.op.output_fields:
1280
        if field == "name":
1281
          val = os_name
1282
        elif field == "valid":
1283
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1284
        elif field == "node_status":
1285
          val = {}
1286
          for node_name, nos_list in os_data.iteritems():
1287
            val[node_name] = [(v.status, v.path) for v in nos_list]
1288
        else:
1289
          raise errors.ParameterError(field)
1290
        row.append(val)
1291
      output.append(row)
1292

    
1293
    return output
1294

    
1295

    
1296
class LURemoveNode(LogicalUnit):
1297
  """Logical unit for removing a node.
1298

1299
  """
1300
  HPATH = "node-remove"
1301
  HTYPE = constants.HTYPE_NODE
1302
  _OP_REQP = ["node_name"]
1303

    
1304
  def BuildHooksEnv(self):
1305
    """Build hooks env.
1306

1307
    This doesn't run on the target node in the pre phase as a failed
1308
    node would then be impossible to remove.
1309

1310
    """
1311
    env = {
1312
      "OP_TARGET": self.op.node_name,
1313
      "NODE_NAME": self.op.node_name,
1314
      }
1315
    all_nodes = self.cfg.GetNodeList()
1316
    all_nodes.remove(self.op.node_name)
1317
    return env, all_nodes, all_nodes
1318

    
1319
  def CheckPrereq(self):
1320
    """Check prerequisites.
1321

1322
    This checks:
1323
     - the node exists in the configuration
1324
     - it does not have primary or secondary instances
1325
     - it's not the master
1326

1327
    Any errors are signalled by raising errors.OpPrereqError.
1328

1329
    """
1330
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1331
    if node is None:
1332
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1333

    
1334
    instance_list = self.cfg.GetInstanceList()
1335

    
1336
    masternode = self.sstore.GetMasterNode()
1337
    if node.name == masternode:
1338
      raise errors.OpPrereqError("Node is the master node,"
1339
                                 " you need to failover first.")
1340

    
1341
    for instance_name in instance_list:
1342
      instance = self.cfg.GetInstanceInfo(instance_name)
1343
      if node.name == instance.primary_node:
1344
        raise errors.OpPrereqError("Instance %s still running on the node,"
1345
                                   " please remove first." % instance_name)
1346
      if node.name in instance.secondary_nodes:
1347
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1348
                                   " please remove first." % instance_name)
1349
    self.op.node_name = node.name
1350
    self.node = node
1351

    
1352
  def Exec(self, feedback_fn):
1353
    """Removes the node from the cluster.
1354

1355
    """
1356
    node = self.node
1357
    logger.Info("stopping the node daemon and removing configs from node %s" %
1358
                node.name)
1359

    
1360
    self.context.RemoveNode(node.name)
1361

    
1362
    rpc.call_node_leave_cluster(node.name)
1363

    
1364

    
1365
class LUQueryNodes(NoHooksLU):
1366
  """Logical unit for querying nodes.
1367

1368
  """
1369
  _OP_REQP = ["output_fields", "names"]
1370
  REQ_BGL = False
1371

    
1372
  def ExpandNames(self):
1373
    self.dynamic_fields = frozenset([
1374
      "dtotal", "dfree",
1375
      "mtotal", "mnode", "mfree",
1376
      "bootid",
1377
      "ctotal",
1378
      ])
1379

    
1380
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1381
                               "pinst_list", "sinst_list",
1382
                               "pip", "sip", "tags"],
1383
                       dynamic=self.dynamic_fields,
1384
                       selected=self.op.output_fields)
1385

    
1386
    self.needed_locks = {}
1387
    self.share_locks[locking.LEVEL_NODE] = 1
1388
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1389
    # that we need atomic ways to get info for a group of nodes from the
1390
    # config, though.
1391
    if not self.op.names:
1392
      self.needed_locks[locking.LEVEL_NODE] = None
1393
    else:
1394
      self.needed_locks[locking.LEVEL_NODE] = \
1395
        _GetWantedNodes(self, self.op.names)
1396

    
1397
  def CheckPrereq(self):
1398
    """Check prerequisites.
1399

1400
    """
1401
    # This of course is valid only if we locked the nodes
1402
    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1403

    
1404
  def Exec(self, feedback_fn):
1405
    """Computes the list of nodes and their attributes.
1406

1407
    """
1408
    nodenames = self.wanted
1409
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1410

    
1411
    # begin data gathering
1412

    
1413
    if self.dynamic_fields.intersection(self.op.output_fields):
1414
      live_data = {}
1415
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1416
      for name in nodenames:
1417
        nodeinfo = node_data.get(name, None)
1418
        if nodeinfo:
1419
          live_data[name] = {
1420
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1421
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1422
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1423
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1424
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1425
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1426
            "bootid": nodeinfo['bootid'],
1427
            }
1428
        else:
1429
          live_data[name] = {}
1430
    else:
1431
      live_data = dict.fromkeys(nodenames, {})
1432

    
1433
    node_to_primary = dict([(name, set()) for name in nodenames])
1434
    node_to_secondary = dict([(name, set()) for name in nodenames])
1435

    
1436
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1437
                             "sinst_cnt", "sinst_list"))
1438
    if inst_fields & frozenset(self.op.output_fields):
1439
      instancelist = self.cfg.GetInstanceList()
1440

    
1441
      for instance_name in instancelist:
1442
        inst = self.cfg.GetInstanceInfo(instance_name)
1443
        if inst.primary_node in node_to_primary:
1444
          node_to_primary[inst.primary_node].add(inst.name)
1445
        for secnode in inst.secondary_nodes:
1446
          if secnode in node_to_secondary:
1447
            node_to_secondary[secnode].add(inst.name)
1448

    
1449
    # end data gathering
1450

    
1451
    output = []
1452
    for node in nodelist:
1453
      node_output = []
1454
      for field in self.op.output_fields:
1455
        if field == "name":
1456
          val = node.name
1457
        elif field == "pinst_list":
1458
          val = list(node_to_primary[node.name])
1459
        elif field == "sinst_list":
1460
          val = list(node_to_secondary[node.name])
1461
        elif field == "pinst_cnt":
1462
          val = len(node_to_primary[node.name])
1463
        elif field == "sinst_cnt":
1464
          val = len(node_to_secondary[node.name])
1465
        elif field == "pip":
1466
          val = node.primary_ip
1467
        elif field == "sip":
1468
          val = node.secondary_ip
1469
        elif field == "tags":
1470
          val = list(node.GetTags())
1471
        elif field in self.dynamic_fields:
1472
          val = live_data[node.name].get(field, None)
1473
        else:
1474
          raise errors.ParameterError(field)
1475
        node_output.append(val)
1476
      output.append(node_output)
1477

    
1478
    return output
1479

    
1480

    
1481
class LUQueryNodeVolumes(NoHooksLU):
1482
  """Logical unit for getting volumes on node(s).
1483

1484
  """
1485
  _OP_REQP = ["nodes", "output_fields"]
1486

    
1487
  def CheckPrereq(self):
1488
    """Check prerequisites.
1489

1490
    This checks that the fields required are valid output fields.
1491

1492
    """
1493
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1494

    
1495
    _CheckOutputFields(static=["node"],
1496
                       dynamic=["phys", "vg", "name", "size", "instance"],
1497
                       selected=self.op.output_fields)
1498

    
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Computes the list of nodes and their attributes.
1502

1503
    """
1504
    nodenames = self.nodes
1505
    volumes = rpc.call_node_volumes(nodenames)
1506

    
1507
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1508
             in self.cfg.GetInstanceList()]
1509

    
1510
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1511

    
1512
    output = []
1513
    for node in nodenames:
1514
      if node not in volumes or not volumes[node]:
1515
        continue
1516

    
1517
      node_vols = volumes[node][:]
1518
      node_vols.sort(key=lambda vol: vol['dev'])
1519

    
1520
      for vol in node_vols:
1521
        node_output = []
1522
        for field in self.op.output_fields:
1523
          if field == "node":
1524
            val = node
1525
          elif field == "phys":
1526
            val = vol['dev']
1527
          elif field == "vg":
1528
            val = vol['vg']
1529
          elif field == "name":
1530
            val = vol['name']
1531
          elif field == "size":
1532
            val = int(float(vol['size']))
1533
          elif field == "instance":
1534
            for inst in ilist:
1535
              if node not in lv_by_node[inst]:
1536
                continue
1537
              if vol['name'] in lv_by_node[inst][node]:
1538
                val = inst.name
1539
                break
1540
            else:
1541
              val = '-'
1542
          else:
1543
            raise errors.ParameterError(field)
1544
          node_output.append(str(val))
1545

    
1546
        output.append(node_output)
1547

    
1548
    return output
1549

    
1550

    
1551
class LUAddNode(LogicalUnit):
1552
  """Logical unit for adding node to the cluster.
1553

1554
  """
1555
  HPATH = "node-add"
1556
  HTYPE = constants.HTYPE_NODE
1557
  _OP_REQP = ["node_name"]
1558

    
1559
  def BuildHooksEnv(self):
1560
    """Build hooks env.
1561

1562
    This will run on all nodes before, and on all nodes + the new node after.
1563

1564
    """
1565
    env = {
1566
      "OP_TARGET": self.op.node_name,
1567
      "NODE_NAME": self.op.node_name,
1568
      "NODE_PIP": self.op.primary_ip,
1569
      "NODE_SIP": self.op.secondary_ip,
1570
      }
1571
    nodes_0 = self.cfg.GetNodeList()
1572
    nodes_1 = nodes_0 + [self.op.node_name, ]
1573
    return env, nodes_0, nodes_1
1574

    
1575
  def CheckPrereq(self):
1576
    """Check prerequisites.
1577

1578
    This checks:
1579
     - the new node is not already in the config
1580
     - it is resolvable
1581
     - its parameters (single/dual homed) matches the cluster
1582

1583
    Any errors are signalled by raising errors.OpPrereqError.
1584

1585
    """
1586
    node_name = self.op.node_name
1587
    cfg = self.cfg
1588

    
1589
    dns_data = utils.HostInfo(node_name)
1590

    
1591
    node = dns_data.name
1592
    primary_ip = self.op.primary_ip = dns_data.ip
1593
    secondary_ip = getattr(self.op, "secondary_ip", None)
1594
    if secondary_ip is None:
1595
      secondary_ip = primary_ip
1596
    if not utils.IsValidIP(secondary_ip):
1597
      raise errors.OpPrereqError("Invalid secondary IP given")
1598
    self.op.secondary_ip = secondary_ip
1599

    
1600
    node_list = cfg.GetNodeList()
1601
    if not self.op.readd and node in node_list:
1602
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1603
                                 node)
1604
    elif self.op.readd and node not in node_list:
1605
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1606

    
1607
    for existing_node_name in node_list:
1608
      existing_node = cfg.GetNodeInfo(existing_node_name)
1609

    
1610
      if self.op.readd and node == existing_node_name:
1611
        if (existing_node.primary_ip != primary_ip or
1612
            existing_node.secondary_ip != secondary_ip):
1613
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1614
                                     " address configuration as before")
1615
        continue
1616

    
1617
      if (existing_node.primary_ip == primary_ip or
1618
          existing_node.secondary_ip == primary_ip or
1619
          existing_node.primary_ip == secondary_ip or
1620
          existing_node.secondary_ip == secondary_ip):
1621
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1622
                                   " existing node %s" % existing_node.name)
1623

    
1624
    # check that the type of the node (single versus dual homed) is the
1625
    # same as for the master
1626
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1627
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1628
    newbie_singlehomed = secondary_ip == primary_ip
1629
    if master_singlehomed != newbie_singlehomed:
1630
      if master_singlehomed:
1631
        raise errors.OpPrereqError("The master has no private ip but the"
1632
                                   " new node has one")
1633
      else:
1634
        raise errors.OpPrereqError("The master has a private ip but the"
1635
                                   " new node doesn't have one")
1636

    
1637
    # checks reachablity
1638
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1639
      raise errors.OpPrereqError("Node not reachable by ping")
1640

    
1641
    if not newbie_singlehomed:
1642
      # check reachability from my secondary ip to newbie's secondary ip
1643
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1644
                           source=myself.secondary_ip):
1645
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1646
                                   " based ping to noded port")
1647

    
1648
    self.new_node = objects.Node(name=node,
1649
                                 primary_ip=primary_ip,
1650
                                 secondary_ip=secondary_ip)
1651

    
1652
  def Exec(self, feedback_fn):
1653
    """Adds the new node to the cluster.
1654

1655
    """
1656
    new_node = self.new_node
1657
    node = new_node.name
1658

    
1659
    # check connectivity
1660
    result = rpc.call_version([node])[node]
1661
    if result:
1662
      if constants.PROTOCOL_VERSION == result:
1663
        logger.Info("communication to node %s fine, sw version %s match" %
1664
                    (node, result))
1665
      else:
1666
        raise errors.OpExecError("Version mismatch master version %s,"
1667
                                 " node version %s" %
1668
                                 (constants.PROTOCOL_VERSION, result))
1669
    else:
1670
      raise errors.OpExecError("Cannot get version from the new node")
1671

    
1672
    # setup ssh on node
1673
    logger.Info("copy ssh key to node %s" % node)
1674
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1675
    keyarray = []
1676
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1677
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1678
                priv_key, pub_key]
1679

    
1680
    for i in keyfiles:
1681
      f = open(i, 'r')
1682
      try:
1683
        keyarray.append(f.read())
1684
      finally:
1685
        f.close()
1686

    
1687
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1688
                               keyarray[3], keyarray[4], keyarray[5])
1689

    
1690
    if not result:
1691
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1692

    
1693
    # Add node to our /etc/hosts, and add key to known_hosts
1694
    utils.AddHostToEtcHosts(new_node.name)
1695

    
1696
    if new_node.secondary_ip != new_node.primary_ip:
1697
      if not rpc.call_node_tcp_ping(new_node.name,
1698
                                    constants.LOCALHOST_IP_ADDRESS,
1699
                                    new_node.secondary_ip,
1700
                                    constants.DEFAULT_NODED_PORT,
1701
                                    10, False):
1702
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1703
                                 " you gave (%s). Please fix and re-run this"
1704
                                 " command." % new_node.secondary_ip)
1705

    
1706
    node_verify_list = [self.sstore.GetMasterNode()]
1707
    node_verify_param = {
1708
      'nodelist': [node],
1709
      # TODO: do a node-net-test as well?
1710
    }
1711

    
1712
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1713
    for verifier in node_verify_list:
1714
      if not result[verifier]:
1715
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1716
                                 " for remote verification" % verifier)
1717
      if result[verifier]['nodelist']:
1718
        for failed in result[verifier]['nodelist']:
1719
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1720
                      (verifier, result[verifier]['nodelist'][failed]))
1721
        raise errors.OpExecError("ssh/hostname verification failed.")
1722

    
1723
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1724
    # including the node just added
1725
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1726
    dist_nodes = self.cfg.GetNodeList()
1727
    if not self.op.readd:
1728
      dist_nodes.append(node)
1729
    if myself.name in dist_nodes:
1730
      dist_nodes.remove(myself.name)
1731

    
1732
    logger.Debug("Copying hosts and known_hosts to all nodes")
1733
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1734
      result = rpc.call_upload_file(dist_nodes, fname)
1735
      for to_node in dist_nodes:
1736
        if not result[to_node]:
1737
          logger.Error("copy of file %s to node %s failed" %
1738
                       (fname, to_node))
1739

    
1740
    to_copy = self.sstore.GetFileList()
1741
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1742
      to_copy.append(constants.VNC_PASSWORD_FILE)
1743
    for fname in to_copy:
1744
      result = rpc.call_upload_file([node], fname)
1745
      if not result[node]:
1746
        logger.Error("could not copy file %s to node %s" % (fname, node))
1747

    
1748
    if self.op.readd:
1749
      self.context.ReaddNode(new_node)
1750
    else:
1751
      self.context.AddNode(new_node)
1752

    
1753

    
1754
class LUQueryClusterInfo(NoHooksLU):
1755
  """Query cluster configuration.
1756

1757
  """
1758
  _OP_REQP = []
1759
  REQ_MASTER = False
1760
  REQ_BGL = False
1761

    
1762
  def ExpandNames(self):
1763
    self.needed_locks = {}
1764

    
1765
  def CheckPrereq(self):
1766
    """No prerequsites needed for this LU.
1767

1768
    """
1769
    pass
1770

    
1771
  def Exec(self, feedback_fn):
1772
    """Return cluster config.
1773

1774
    """
1775
    result = {
1776
      "name": self.sstore.GetClusterName(),
1777
      "software_version": constants.RELEASE_VERSION,
1778
      "protocol_version": constants.PROTOCOL_VERSION,
1779
      "config_version": constants.CONFIG_VERSION,
1780
      "os_api_version": constants.OS_API_VERSION,
1781
      "export_version": constants.EXPORT_VERSION,
1782
      "master": self.sstore.GetMasterNode(),
1783
      "architecture": (platform.architecture()[0], platform.machine()),
1784
      "hypervisor_type": self.sstore.GetHypervisorType(),
1785
      }
1786

    
1787
    return result
1788

    
1789

    
1790
class LUDumpClusterConfig(NoHooksLU):
1791
  """Return a text-representation of the cluster-config.
1792

1793
  """
1794
  _OP_REQP = []
1795
  REQ_BGL = False
1796

    
1797
  def ExpandNames(self):
1798
    self.needed_locks = {}
1799

    
1800
  def CheckPrereq(self):
1801
    """No prerequisites.
1802

1803
    """
1804
    pass
1805

    
1806
  def Exec(self, feedback_fn):
1807
    """Dump a representation of the cluster config to the standard output.
1808

1809
    """
1810
    return self.cfg.DumpConfig()
1811

    
1812

    
1813
class LUActivateInstanceDisks(NoHooksLU):
1814
  """Bring up an instance's disks.
1815

1816
  """
1817
  _OP_REQP = ["instance_name"]
1818

    
1819
  def CheckPrereq(self):
1820
    """Check prerequisites.
1821

1822
    This checks that the instance is in the cluster.
1823

1824
    """
1825
    instance = self.cfg.GetInstanceInfo(
1826
      self.cfg.ExpandInstanceName(self.op.instance_name))
1827
    if instance is None:
1828
      raise errors.OpPrereqError("Instance '%s' not known" %
1829
                                 self.op.instance_name)
1830
    self.instance = instance
1831

    
1832

    
1833
  def Exec(self, feedback_fn):
1834
    """Activate the disks.
1835

1836
    """
1837
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1838
    if not disks_ok:
1839
      raise errors.OpExecError("Cannot activate block devices")
1840

    
1841
    return disks_info
1842

    
1843

    
1844
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1845
  """Prepare the block devices for an instance.
1846

1847
  This sets up the block devices on all nodes.
1848

1849
  Args:
1850
    instance: a ganeti.objects.Instance object
1851
    ignore_secondaries: if true, errors on secondary nodes won't result
1852
                        in an error return from the function
1853

1854
  Returns:
1855
    false if the operation failed
1856
    list of (host, instance_visible_name, node_visible_name) if the operation
1857
         suceeded with the mapping from node devices to instance devices
1858
  """
1859
  device_info = []
1860
  disks_ok = True
1861
  iname = instance.name
1862
  # With the two passes mechanism we try to reduce the window of
1863
  # opportunity for the race condition of switching DRBD to primary
1864
  # before handshaking occured, but we do not eliminate it
1865

    
1866
  # The proper fix would be to wait (with some limits) until the
1867
  # connection has been made and drbd transitions from WFConnection
1868
  # into any other network-connected state (Connected, SyncTarget,
1869
  # SyncSource, etc.)
1870

    
1871
  # 1st pass, assemble on all nodes in secondary mode
1872
  for inst_disk in instance.disks:
1873
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1874
      cfg.SetDiskID(node_disk, node)
1875
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1876
      if not result:
1877
        logger.Error("could not prepare block device %s on node %s"
1878
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1879
        if not ignore_secondaries:
1880
          disks_ok = False
1881

    
1882
  # FIXME: race condition on drbd migration to primary
1883

    
1884
  # 2nd pass, do only the primary node
1885
  for inst_disk in instance.disks:
1886
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1887
      if node != instance.primary_node:
1888
        continue
1889
      cfg.SetDiskID(node_disk, node)
1890
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1891
      if not result:
1892
        logger.Error("could not prepare block device %s on node %s"
1893
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1894
        disks_ok = False
1895
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1896

    
1897
  # leave the disks configured for the primary node
1898
  # this is a workaround that would be fixed better by
1899
  # improving the logical/physical id handling
1900
  for disk in instance.disks:
1901
    cfg.SetDiskID(disk, instance.primary_node)
1902

    
1903
  return disks_ok, device_info
1904

    
1905

    
1906
def _StartInstanceDisks(cfg, instance, force):
1907
  """Start the disks of an instance.
1908

1909
  """
1910
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1911
                                           ignore_secondaries=force)
1912
  if not disks_ok:
1913
    _ShutdownInstanceDisks(instance, cfg)
1914
    if force is not None and not force:
1915
      logger.Error("If the message above refers to a secondary node,"
1916
                   " you can retry the operation using '--force'.")
1917
    raise errors.OpExecError("Disk consistency error")
1918

    
1919

    
1920
class LUDeactivateInstanceDisks(NoHooksLU):
1921
  """Shutdown an instance's disks.
1922

1923
  """
1924
  _OP_REQP = ["instance_name"]
1925

    
1926
  def CheckPrereq(self):
1927
    """Check prerequisites.
1928

1929
    This checks that the instance is in the cluster.
1930

1931
    """
1932
    instance = self.cfg.GetInstanceInfo(
1933
      self.cfg.ExpandInstanceName(self.op.instance_name))
1934
    if instance is None:
1935
      raise errors.OpPrereqError("Instance '%s' not known" %
1936
                                 self.op.instance_name)
1937
    self.instance = instance
1938

    
1939
  def Exec(self, feedback_fn):
1940
    """Deactivate the disks
1941

1942
    """
1943
    instance = self.instance
1944
    ins_l = rpc.call_instance_list([instance.primary_node])
1945
    ins_l = ins_l[instance.primary_node]
1946
    if not type(ins_l) is list:
1947
      raise errors.OpExecError("Can't contact node '%s'" %
1948
                               instance.primary_node)
1949

    
1950
    if self.instance.name in ins_l:
1951
      raise errors.OpExecError("Instance is running, can't shutdown"
1952
                               " block devices.")
1953

    
1954
    _ShutdownInstanceDisks(instance, self.cfg)
1955

    
1956

    
1957
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1958
  """Shutdown block devices of an instance.
1959

1960
  This does the shutdown on all nodes of the instance.
1961

1962
  If the ignore_primary is false, errors on the primary node are
1963
  ignored.
1964

1965
  """
1966
  result = True
1967
  for disk in instance.disks:
1968
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1969
      cfg.SetDiskID(top_disk, node)
1970
      if not rpc.call_blockdev_shutdown(node, top_disk):
1971
        logger.Error("could not shutdown block device %s on node %s" %
1972
                     (disk.iv_name, node))
1973
        if not ignore_primary or node != instance.primary_node:
1974
          result = False
1975
  return result
1976

    
1977

    
1978
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1979
  """Checks if a node has enough free memory.
1980

1981
  This function check if a given node has the needed amount of free
1982
  memory. In case the node has less memory or we cannot get the
1983
  information from the node, this function raise an OpPrereqError
1984
  exception.
1985

1986
  Args:
1987
    - cfg: a ConfigWriter instance
1988
    - node: the node name
1989
    - reason: string to use in the error message
1990
    - requested: the amount of memory in MiB
1991

1992
  """
1993
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1994
  if not nodeinfo or not isinstance(nodeinfo, dict):
1995
    raise errors.OpPrereqError("Could not contact node %s for resource"
1996
                             " information" % (node,))
1997

    
1998
  free_mem = nodeinfo[node].get('memory_free')
1999
  if not isinstance(free_mem, int):
2000
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2001
                             " was '%s'" % (node, free_mem))
2002
  if requested > free_mem:
2003
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2004
                             " needed %s MiB, available %s MiB" %
2005
                             (node, reason, requested, free_mem))
2006

    
2007

    
2008
class LUStartupInstance(LogicalUnit):
2009
  """Starts an instance.
2010

2011
  """
2012
  HPATH = "instance-start"
2013
  HTYPE = constants.HTYPE_INSTANCE
2014
  _OP_REQP = ["instance_name", "force"]
2015
  REQ_BGL = False
2016

    
2017
  def ExpandNames(self):
2018
    self._ExpandAndLockInstance()
2019
    self.needed_locks[locking.LEVEL_NODE] = []
2020
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2021

    
2022
  def DeclareLocks(self, level):
2023
    if level == locking.LEVEL_NODE:
2024
      self._LockInstancesNodes()
2025

    
2026
  def BuildHooksEnv(self):
2027
    """Build hooks env.
2028

2029
    This runs on master, primary and secondary nodes of the instance.
2030

2031
    """
2032
    env = {
2033
      "FORCE": self.op.force,
2034
      }
2035
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2036
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2037
          list(self.instance.secondary_nodes))
2038
    return env, nl, nl
2039

    
2040
  def CheckPrereq(self):
2041
    """Check prerequisites.
2042

2043
    This checks that the instance is in the cluster.
2044

2045
    """
2046
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2047
    assert self.instance is not None, \
2048
      "Cannot retrieve locked instance %s" % self.op.instance_name
2049

    
2050
    # check bridges existance
2051
    _CheckInstanceBridgesExist(instance)
2052

    
2053
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2054
                         "starting instance %s" % instance.name,
2055
                         instance.memory)
2056

    
2057
  def Exec(self, feedback_fn):
2058
    """Start the instance.
2059

2060
    """
2061
    instance = self.instance
2062
    force = self.op.force
2063
    extra_args = getattr(self.op, "extra_args", "")
2064

    
2065
    self.cfg.MarkInstanceUp(instance.name)
2066

    
2067
    node_current = instance.primary_node
2068

    
2069
    _StartInstanceDisks(self.cfg, instance, force)
2070

    
2071
    if not rpc.call_instance_start(node_current, instance, extra_args):
2072
      _ShutdownInstanceDisks(instance, self.cfg)
2073
      raise errors.OpExecError("Could not start instance")
2074

    
2075

    
2076
class LURebootInstance(LogicalUnit):
2077
  """Reboot an instance.
2078

2079
  """
2080
  HPATH = "instance-reboot"
2081
  HTYPE = constants.HTYPE_INSTANCE
2082
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2083
  REQ_BGL = False
2084

    
2085
  def ExpandNames(self):
2086
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2087
                                   constants.INSTANCE_REBOOT_HARD,
2088
                                   constants.INSTANCE_REBOOT_FULL]:
2089
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2090
                                  (constants.INSTANCE_REBOOT_SOFT,
2091
                                   constants.INSTANCE_REBOOT_HARD,
2092
                                   constants.INSTANCE_REBOOT_FULL))
2093
    self._ExpandAndLockInstance()
2094
    self.needed_locks[locking.LEVEL_NODE] = []
2095
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2096

    
2097
  def DeclareLocks(self, level):
2098
    if level == locking.LEVEL_NODE:
2099
      # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2100
      self._LockInstancesNodes()
2101

    
2102
  def BuildHooksEnv(self):
2103
    """Build hooks env.
2104

2105
    This runs on master, primary and secondary nodes of the instance.
2106

2107
    """
2108
    env = {
2109
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2110
      }
2111
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2112
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2113
          list(self.instance.secondary_nodes))
2114
    return env, nl, nl
2115

    
2116
  def CheckPrereq(self):
2117
    """Check prerequisites.
2118

2119
    This checks that the instance is in the cluster.
2120

2121
    """
2122
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2123
    assert self.instance is not None, \
2124
      "Cannot retrieve locked instance %s" % self.op.instance_name
2125

    
2126
    # check bridges existance
2127
    _CheckInstanceBridgesExist(instance)
2128

    
2129
  def Exec(self, feedback_fn):
2130
    """Reboot the instance.
2131

2132
    """
2133
    instance = self.instance
2134
    ignore_secondaries = self.op.ignore_secondaries
2135
    reboot_type = self.op.reboot_type
2136
    extra_args = getattr(self.op, "extra_args", "")
2137

    
2138
    node_current = instance.primary_node
2139

    
2140
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2141
                       constants.INSTANCE_REBOOT_HARD]:
2142
      if not rpc.call_instance_reboot(node_current, instance,
2143
                                      reboot_type, extra_args):
2144
        raise errors.OpExecError("Could not reboot instance")
2145
    else:
2146
      if not rpc.call_instance_shutdown(node_current, instance):
2147
        raise errors.OpExecError("could not shutdown instance for full reboot")
2148
      _ShutdownInstanceDisks(instance, self.cfg)
2149
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2150
      if not rpc.call_instance_start(node_current, instance, extra_args):
2151
        _ShutdownInstanceDisks(instance, self.cfg)
2152
        raise errors.OpExecError("Could not start instance for full reboot")
2153

    
2154
    self.cfg.MarkInstanceUp(instance.name)
2155

    
2156

    
2157
class LUShutdownInstance(LogicalUnit):
2158
  """Shutdown an instance.
2159

2160
  """
2161
  HPATH = "instance-stop"
2162
  HTYPE = constants.HTYPE_INSTANCE
2163
  _OP_REQP = ["instance_name"]
2164
  REQ_BGL = False
2165

    
2166
  def ExpandNames(self):
2167
    self._ExpandAndLockInstance()
2168
    self.needed_locks[locking.LEVEL_NODE] = []
2169
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2170

    
2171
  def DeclareLocks(self, level):
2172
    if level == locking.LEVEL_NODE:
2173
      self._LockInstancesNodes()
2174

    
2175
  def BuildHooksEnv(self):
2176
    """Build hooks env.
2177

2178
    This runs on master, primary and secondary nodes of the instance.
2179

2180
    """
2181
    env = _BuildInstanceHookEnvByObject(self.instance)
2182
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2183
          list(self.instance.secondary_nodes))
2184
    return env, nl, nl
2185

    
2186
  def CheckPrereq(self):
2187
    """Check prerequisites.
2188

2189
    This checks that the instance is in the cluster.
2190

2191
    """
2192
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2193
    assert self.instance is not None, \
2194
      "Cannot retrieve locked instance %s" % self.op.instance_name
2195

    
2196
  def Exec(self, feedback_fn):
2197
    """Shutdown the instance.
2198

2199
    """
2200
    instance = self.instance
2201
    node_current = instance.primary_node
2202
    self.cfg.MarkInstanceDown(instance.name)
2203
    if not rpc.call_instance_shutdown(node_current, instance):
2204
      logger.Error("could not shutdown instance")
2205

    
2206
    _ShutdownInstanceDisks(instance, self.cfg)
2207

    
2208

    
2209
class LUReinstallInstance(LogicalUnit):
2210
  """Reinstall an instance.
2211

2212
  """
2213
  HPATH = "instance-reinstall"
2214
  HTYPE = constants.HTYPE_INSTANCE
2215
  _OP_REQP = ["instance_name"]
2216
  REQ_BGL = False
2217

    
2218
  def ExpandNames(self):
2219
    self._ExpandAndLockInstance()
2220
    self.needed_locks[locking.LEVEL_NODE] = []
2221
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2222

    
2223
  def DeclareLocks(self, level):
2224
    if level == locking.LEVEL_NODE:
2225
      self._LockInstancesNodes()
2226

    
2227
  def BuildHooksEnv(self):
2228
    """Build hooks env.
2229

2230
    This runs on master, primary and secondary nodes of the instance.
2231

2232
    """
2233
    env = _BuildInstanceHookEnvByObject(self.instance)
2234
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2235
          list(self.instance.secondary_nodes))
2236
    return env, nl, nl
2237

    
2238
  def CheckPrereq(self):
2239
    """Check prerequisites.
2240

2241
    This checks that the instance is in the cluster and is not running.
2242

2243
    """
2244
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2245
    assert instance is not None, \
2246
      "Cannot retrieve locked instance %s" % self.op.instance_name
2247

    
2248
    if instance.disk_template == constants.DT_DISKLESS:
2249
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2250
                                 self.op.instance_name)
2251
    if instance.status != "down":
2252
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2253
                                 self.op.instance_name)
2254
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2255
    if remote_info:
2256
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2257
                                 (self.op.instance_name,
2258
                                  instance.primary_node))
2259

    
2260
    self.op.os_type = getattr(self.op, "os_type", None)
2261
    if self.op.os_type is not None:
2262
      # OS verification
2263
      pnode = self.cfg.GetNodeInfo(
2264
        self.cfg.ExpandNodeName(instance.primary_node))
2265
      if pnode is None:
2266
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2267
                                   self.op.pnode)
2268
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2269
      if not os_obj:
2270
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2271
                                   " primary node"  % self.op.os_type)
2272

    
2273
    self.instance = instance
2274

    
2275
  def Exec(self, feedback_fn):
2276
    """Reinstall the instance.
2277

2278
    """
2279
    inst = self.instance
2280

    
2281
    if self.op.os_type is not None:
2282
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2283
      inst.os = self.op.os_type
2284
      self.cfg.AddInstance(inst)
2285

    
2286
    _StartInstanceDisks(self.cfg, inst, None)
2287
    try:
2288
      feedback_fn("Running the instance OS create scripts...")
2289
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2290
        raise errors.OpExecError("Could not install OS for instance %s"
2291
                                 " on node %s" %
2292
                                 (inst.name, inst.primary_node))
2293
    finally:
2294
      _ShutdownInstanceDisks(inst, self.cfg)
2295

    
2296

    
2297
class LURenameInstance(LogicalUnit):
2298
  """Rename an instance.
2299

2300
  """
2301
  HPATH = "instance-rename"
2302
  HTYPE = constants.HTYPE_INSTANCE
2303
  _OP_REQP = ["instance_name", "new_name"]
2304

    
2305
  def BuildHooksEnv(self):
2306
    """Build hooks env.
2307

2308
    This runs on master, primary and secondary nodes of the instance.
2309

2310
    """
2311
    env = _BuildInstanceHookEnvByObject(self.instance)
2312
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2313
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2314
          list(self.instance.secondary_nodes))
2315
    return env, nl, nl
2316

    
2317
  def CheckPrereq(self):
2318
    """Check prerequisites.
2319

2320
    This checks that the instance is in the cluster and is not running.
2321

2322
    """
2323
    instance = self.cfg.GetInstanceInfo(
2324
      self.cfg.ExpandInstanceName(self.op.instance_name))
2325
    if instance is None:
2326
      raise errors.OpPrereqError("Instance '%s' not known" %
2327
                                 self.op.instance_name)
2328
    if instance.status != "down":
2329
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2330
                                 self.op.instance_name)
2331
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2332
    if remote_info:
2333
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2334
                                 (self.op.instance_name,
2335
                                  instance.primary_node))
2336
    self.instance = instance
2337

    
2338
    # new name verification
2339
    name_info = utils.HostInfo(self.op.new_name)
2340

    
2341
    self.op.new_name = new_name = name_info.name
2342
    instance_list = self.cfg.GetInstanceList()
2343
    if new_name in instance_list:
2344
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2345
                                 new_name)
2346

    
2347
    if not getattr(self.op, "ignore_ip", False):
2348
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2349
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2350
                                   (name_info.ip, new_name))
2351

    
2352

    
2353
  def Exec(self, feedback_fn):
2354
    """Reinstall the instance.
2355

2356
    """
2357
    inst = self.instance
2358
    old_name = inst.name
2359

    
2360
    if inst.disk_template == constants.DT_FILE:
2361
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2362

    
2363
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2364
    # Change the instance lock. This is definitely safe while we hold the BGL
2365
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2366
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2367

    
2368
    # re-read the instance from the configuration after rename
2369
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2370

    
2371
    if inst.disk_template == constants.DT_FILE:
2372
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2373
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2374
                                                old_file_storage_dir,
2375
                                                new_file_storage_dir)
2376

    
2377
      if not result:
2378
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2379
                                 " directory '%s' to '%s' (but the instance"
2380
                                 " has been renamed in Ganeti)" % (
2381
                                 inst.primary_node, old_file_storage_dir,
2382
                                 new_file_storage_dir))
2383

    
2384
      if not result[0]:
2385
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2386
                                 " (but the instance has been renamed in"
2387
                                 " Ganeti)" % (old_file_storage_dir,
2388
                                               new_file_storage_dir))
2389

    
2390
    _StartInstanceDisks(self.cfg, inst, None)
2391
    try:
2392
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2393
                                          "sda", "sdb"):
2394
        msg = ("Could not run OS rename script for instance %s on node %s"
2395
               " (but the instance has been renamed in Ganeti)" %
2396
               (inst.name, inst.primary_node))
2397
        logger.Error(msg)
2398
    finally:
2399
      _ShutdownInstanceDisks(inst, self.cfg)
2400

    
2401

    
2402
class LURemoveInstance(LogicalUnit):
2403
  """Remove an instance.
2404

2405
  """
2406
  HPATH = "instance-remove"
2407
  HTYPE = constants.HTYPE_INSTANCE
2408
  _OP_REQP = ["instance_name", "ignore_failures"]
2409

    
2410
  def BuildHooksEnv(self):
2411
    """Build hooks env.
2412

2413
    This runs on master, primary and secondary nodes of the instance.
2414

2415
    """
2416
    env = _BuildInstanceHookEnvByObject(self.instance)
2417
    nl = [self.sstore.GetMasterNode()]
2418
    return env, nl, nl
2419

    
2420
  def CheckPrereq(self):
2421
    """Check prerequisites.
2422

2423
    This checks that the instance is in the cluster.
2424

2425
    """
2426
    instance = self.cfg.GetInstanceInfo(
2427
      self.cfg.ExpandInstanceName(self.op.instance_name))
2428
    if instance is None:
2429
      raise errors.OpPrereqError("Instance '%s' not known" %
2430
                                 self.op.instance_name)
2431
    self.instance = instance
2432

    
2433
  def Exec(self, feedback_fn):
2434
    """Remove the instance.
2435

2436
    """
2437
    instance = self.instance
2438
    logger.Info("shutting down instance %s on node %s" %
2439
                (instance.name, instance.primary_node))
2440

    
2441
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2442
      if self.op.ignore_failures:
2443
        feedback_fn("Warning: can't shutdown instance")
2444
      else:
2445
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2446
                                 (instance.name, instance.primary_node))
2447

    
2448
    logger.Info("removing block devices for instance %s" % instance.name)
2449

    
2450
    if not _RemoveDisks(instance, self.cfg):
2451
      if self.op.ignore_failures:
2452
        feedback_fn("Warning: can't remove instance's disks")
2453
      else:
2454
        raise errors.OpExecError("Can't remove instance's disks")
2455

    
2456
    logger.Info("removing instance %s out of cluster config" % instance.name)
2457

    
2458
    self.cfg.RemoveInstance(instance.name)
2459
    # Remove the new instance from the Ganeti Lock Manager
2460
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2461

    
2462

    
2463
class LUQueryInstances(NoHooksLU):
2464
  """Logical unit for querying instances.
2465

2466
  """
2467
  _OP_REQP = ["output_fields", "names"]
2468
  REQ_BGL = False
2469

    
2470
  def ExpandNames(self):
2471
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2472
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2473
                               "admin_state", "admin_ram",
2474
                               "disk_template", "ip", "mac", "bridge",
2475
                               "sda_size", "sdb_size", "vcpus", "tags",
2476
                               "auto_balance",
2477
                               "network_port", "kernel_path", "initrd_path",
2478
                               "hvm_boot_order", "hvm_acpi", "hvm_pae",
2479
                               "hvm_cdrom_image_path", "hvm_nic_type",
2480
                               "hvm_disk_type", "vnc_bind_address"],
2481
                       dynamic=self.dynamic_fields,
2482
                       selected=self.op.output_fields)
2483

    
2484
    self.needed_locks = {}
2485
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2486
    self.share_locks[locking.LEVEL_NODE] = 1
2487

    
2488
    # TODO: we could lock instances (and nodes) only if the user asked for
2489
    # dynamic fields. For that we need atomic ways to get info for a group of
2490
    # instances from the config, though.
2491
    if not self.op.names:
2492
      self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
2493
    else:
2494
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2495
        _GetWantedInstances(self, self.op.names)
2496

    
2497
    self.needed_locks[locking.LEVEL_NODE] = []
2498
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2499

    
2500
  def DeclareLocks(self, level):
2501
    # TODO: locking of nodes could be avoided when not querying them
2502
    if level == locking.LEVEL_NODE:
2503
      self._LockInstancesNodes()
2504

    
2505
  def CheckPrereq(self):
2506
    """Check prerequisites.
2507

2508
    """
2509
    # This of course is valid only if we locked the instances
2510
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2511

    
2512
  def Exec(self, feedback_fn):
2513
    """Computes the list of nodes and their attributes.
2514

2515
    """
2516
    instance_names = self.wanted
2517
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2518
                     in instance_names]
2519

    
2520
    # begin data gathering
2521

    
2522
    nodes = frozenset([inst.primary_node for inst in instance_list])
2523

    
2524
    bad_nodes = []
2525
    if self.dynamic_fields.intersection(self.op.output_fields):
2526
      live_data = {}
2527
      node_data = rpc.call_all_instances_info(nodes)
2528
      for name in nodes:
2529
        result = node_data[name]
2530
        if result:
2531
          live_data.update(result)
2532
        elif result == False:
2533
          bad_nodes.append(name)
2534
        # else no instance is alive
2535
    else:
2536
      live_data = dict([(name, {}) for name in instance_names])
2537

    
2538
    # end data gathering
2539

    
2540
    output = []
2541
    for instance in instance_list:
2542
      iout = []
2543
      for field in self.op.output_fields:
2544
        if field == "name":
2545
          val = instance.name
2546
        elif field == "os":
2547
          val = instance.os
2548
        elif field == "pnode":
2549
          val = instance.primary_node
2550
        elif field == "snodes":
2551
          val = list(instance.secondary_nodes)
2552
        elif field == "admin_state":
2553
          val = (instance.status != "down")
2554
        elif field == "oper_state":
2555
          if instance.primary_node in bad_nodes:
2556
            val = None
2557
          else:
2558
            val = bool(live_data.get(instance.name))
2559
        elif field == "status":
2560
          if instance.primary_node in bad_nodes:
2561
            val = "ERROR_nodedown"
2562
          else:
2563
            running = bool(live_data.get(instance.name))
2564
            if running:
2565
              if instance.status != "down":
2566
                val = "running"
2567
              else:
2568
                val = "ERROR_up"
2569
            else:
2570
              if instance.status != "down":
2571
                val = "ERROR_down"
2572
              else:
2573
                val = "ADMIN_down"
2574
        elif field == "admin_ram":
2575
          val = instance.memory
2576
        elif field == "oper_ram":
2577
          if instance.primary_node in bad_nodes:
2578
            val = None
2579
          elif instance.name in live_data:
2580
            val = live_data[instance.name].get("memory", "?")
2581
          else:
2582
            val = "-"
2583
        elif field == "disk_template":
2584
          val = instance.disk_template
2585
        elif field == "ip":
2586
          val = instance.nics[0].ip
2587
        elif field == "bridge":
2588
          val = instance.nics[0].bridge
2589
        elif field == "mac":
2590
          val = instance.nics[0].mac
2591
        elif field == "sda_size" or field == "sdb_size":
2592
          disk = instance.FindDisk(field[:3])
2593
          if disk is None:
2594
            val = None
2595
          else:
2596
            val = disk.size
2597
        elif field == "vcpus":
2598
          val = instance.vcpus
2599
        elif field == "tags":
2600
          val = list(instance.GetTags())
2601
        elif field in ("network_port", "kernel_path", "initrd_path",
2602
                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2603
                       "hvm_cdrom_image_path", "hvm_nic_type",
2604
                       "hvm_disk_type", "vnc_bind_address"):
2605
          val = getattr(instance, field, None)
2606
          if val is not None:
2607
            pass
2608
          elif field in ("hvm_nic_type", "hvm_disk_type",
2609
                         "kernel_path", "initrd_path"):
2610
            val = "default"
2611
          else:
2612
            val = "-"
2613
        else:
2614
          raise errors.ParameterError(field)
2615
        iout.append(val)
2616
      output.append(iout)
2617

    
2618
    return output
2619

    
2620

    
2621
class LUFailoverInstance(LogicalUnit):
2622
  """Failover an instance.
2623

2624
  """
2625
  HPATH = "instance-failover"
2626
  HTYPE = constants.HTYPE_INSTANCE
2627
  _OP_REQP = ["instance_name", "ignore_consistency"]
2628
  REQ_BGL = False
2629

    
2630
  def ExpandNames(self):
2631
    self._ExpandAndLockInstance()
2632
    self.needed_locks[locking.LEVEL_NODE] = []
2633
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2634

    
2635
  def DeclareLocks(self, level):
2636
    if level == locking.LEVEL_NODE:
2637
      self._LockInstancesNodes()
2638

    
2639
  def BuildHooksEnv(self):
2640
    """Build hooks env.
2641

2642
    This runs on master, primary and secondary nodes of the instance.
2643

2644
    """
2645
    env = {
2646
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2647
      }
2648
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2649
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2650
    return env, nl, nl
2651

    
2652
  def CheckPrereq(self):
2653
    """Check prerequisites.
2654

2655
    This checks that the instance is in the cluster.
2656

2657
    """
2658
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2659
    assert self.instance is not None, \
2660
      "Cannot retrieve locked instance %s" % self.op.instance_name
2661

    
2662
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2663
      raise errors.OpPrereqError("Instance's disk layout is not"
2664
                                 " network mirrored, cannot failover.")
2665

    
2666
    secondary_nodes = instance.secondary_nodes
2667
    if not secondary_nodes:
2668
      raise errors.ProgrammerError("no secondary node but using "
2669
                                   "a mirrored disk template")
2670

    
2671
    target_node = secondary_nodes[0]
2672
    # check memory requirements on the secondary node
2673
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2674
                         instance.name, instance.memory)
2675

    
2676
    # check bridge existance
2677
    brlist = [nic.bridge for nic in instance.nics]
2678
    if not rpc.call_bridges_exist(target_node, brlist):
2679
      raise errors.OpPrereqError("One or more target bridges %s does not"
2680
                                 " exist on destination node '%s'" %
2681
                                 (brlist, target_node))
2682

    
2683
  def Exec(self, feedback_fn):
2684
    """Failover an instance.
2685

2686
    The failover is done by shutting it down on its present node and
2687
    starting it on the secondary.
2688

2689
    """
2690
    instance = self.instance
2691

    
2692
    source_node = instance.primary_node
2693
    target_node = instance.secondary_nodes[0]
2694

    
2695
    feedback_fn("* checking disk consistency between source and target")
2696
    for dev in instance.disks:
2697
      # for drbd, these are drbd over lvm
2698
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2699
        if instance.status == "up" and not self.op.ignore_consistency:
2700
          raise errors.OpExecError("Disk %s is degraded on target node,"
2701
                                   " aborting failover." % dev.iv_name)
2702

    
2703
    feedback_fn("* shutting down instance on source node")
2704
    logger.Info("Shutting down instance %s on node %s" %
2705
                (instance.name, source_node))
2706

    
2707
    if not rpc.call_instance_shutdown(source_node, instance):
2708
      if self.op.ignore_consistency:
2709
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2710
                     " anyway. Please make sure node %s is down"  %
2711
                     (instance.name, source_node, source_node))
2712
      else:
2713
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2714
                                 (instance.name, source_node))
2715

    
2716
    feedback_fn("* deactivating the instance's disks on source node")
2717
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2718
      raise errors.OpExecError("Can't shut down the instance's disks.")
2719

    
2720
    instance.primary_node = target_node
2721
    # distribute new instance config to the other nodes
2722
    self.cfg.Update(instance)
2723

    
2724
    # Only start the instance if it's marked as up
2725
    if instance.status == "up":
2726
      feedback_fn("* activating the instance's disks on target node")
2727
      logger.Info("Starting instance %s on node %s" %
2728
                  (instance.name, target_node))
2729

    
2730
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2731
                                               ignore_secondaries=True)
2732
      if not disks_ok:
2733
        _ShutdownInstanceDisks(instance, self.cfg)
2734
        raise errors.OpExecError("Can't activate the instance's disks")
2735

    
2736
      feedback_fn("* starting the instance on the target node")
2737
      if not rpc.call_instance_start(target_node, instance, None):
2738
        _ShutdownInstanceDisks(instance, self.cfg)
2739
        raise errors.OpExecError("Could not start instance %s on node %s." %
2740
                                 (instance.name, target_node))
2741

    
2742

    
2743
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2744
  """Create a tree of block devices on the primary node.
2745

2746
  This always creates all devices.
2747

2748
  """
2749
  if device.children:
2750
    for child in device.children:
2751
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2752
        return False
2753

    
2754
  cfg.SetDiskID(device, node)
2755
  new_id = rpc.call_blockdev_create(node, device, device.size,
2756
                                    instance.name, True, info)
2757
  if not new_id:
2758
    return False
2759
  if device.physical_id is None:
2760
    device.physical_id = new_id
2761
  return True
2762

    
2763

    
2764
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2765
  """Create a tree of block devices on a secondary node.
2766

2767
  If this device type has to be created on secondaries, create it and
2768
  all its children.
2769

2770
  If not, just recurse to children keeping the same 'force' value.
2771

2772
  """
2773
  if device.CreateOnSecondary():
2774
    force = True
2775
  if device.children:
2776
    for child in device.children:
2777
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2778
                                        child, force, info):
2779
        return False
2780

    
2781
  if not force:
2782
    return True
2783
  cfg.SetDiskID(device, node)
2784
  new_id = rpc.call_blockdev_create(node, device, device.size,
2785
                                    instance.name, False, info)
2786
  if not new_id:
2787
    return False
2788
  if device.physical_id is None:
2789
    device.physical_id = new_id
2790
  return True
2791

    
2792

    
2793
def _GenerateUniqueNames(cfg, exts):
2794
  """Generate a suitable LV name.
2795

2796
  This will generate a logical volume name for the given instance.
2797

2798
  """
2799
  results = []
2800
  for val in exts:
2801
    new_id = cfg.GenerateUniqueID()
2802
    results.append("%s%s" % (new_id, val))
2803
  return results
2804

    
2805

    
2806
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2807
  """Generate a drbd8 device complete with its children.
2808

2809
  """
2810
  port = cfg.AllocatePort()
2811
  vgname = cfg.GetVGName()
2812
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2813
                          logical_id=(vgname, names[0]))
2814
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2815
                          logical_id=(vgname, names[1]))
2816
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2817
                          logical_id = (primary, secondary, port),
2818
                          children = [dev_data, dev_meta],
2819
                          iv_name=iv_name)
2820
  return drbd_dev
2821

    
2822

    
2823
def _GenerateDiskTemplate(cfg, template_name,
2824
                          instance_name, primary_node,
2825
                          secondary_nodes, disk_sz, swap_sz,
2826
                          file_storage_dir, file_driver):
2827
  """Generate the entire disk layout for a given template type.
2828

2829
  """
2830
  #TODO: compute space requirements
2831

    
2832
  vgname = cfg.GetVGName()
2833
  if template_name == constants.DT_DISKLESS:
2834
    disks = []
2835
  elif template_name == constants.DT_PLAIN:
2836
    if len(secondary_nodes) != 0:
2837
      raise errors.ProgrammerError("Wrong template configuration")
2838

    
2839
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2840
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2841
                           logical_id=(vgname, names[0]),
2842
                           iv_name = "sda")
2843
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2844
                           logical_id=(vgname, names[1]),
2845
                           iv_name = "sdb")
2846
    disks = [sda_dev, sdb_dev]
2847
  elif template_name == constants.DT_DRBD8:
2848
    if len(secondary_nodes) != 1:
2849
      raise errors.ProgrammerError("Wrong template configuration")
2850
    remote_node = secondary_nodes[0]
2851
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2852
                                       ".sdb_data", ".sdb_meta"])
2853
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2854
                                         disk_sz, names[0:2], "sda")
2855
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2856
                                         swap_sz, names[2:4], "sdb")
2857
    disks = [drbd_sda_dev, drbd_sdb_dev]
2858
  elif template_name == constants.DT_FILE:
2859
    if len(secondary_nodes) != 0:
2860
      raise errors.ProgrammerError("Wrong template configuration")
2861

    
2862
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2863
                                iv_name="sda", logical_id=(file_driver,
2864
                                "%s/sda" % file_storage_dir))
2865
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2866
                                iv_name="sdb", logical_id=(file_driver,
2867
                                "%s/sdb" % file_storage_dir))
2868
    disks = [file_sda_dev, file_sdb_dev]
2869
  else:
2870
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2871
  return disks
2872

    
2873

    
2874
def _GetInstanceInfoText(instance):
2875
  """Compute that text that should be added to the disk's metadata.
2876

2877
  """
2878
  return "originstname+%s" % instance.name
2879

    
2880

    
2881
def _CreateDisks(cfg, instance):
2882
  """Create all disks for an instance.
2883

2884
  This abstracts away some work from AddInstance.
2885

2886
  Args:
2887
    instance: the instance object
2888

2889
  Returns:
2890
    True or False showing the success of the creation process
2891

2892
  """
2893
  info = _GetInstanceInfoText(instance)
2894

    
2895
  if instance.disk_template == constants.DT_FILE:
2896
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2897
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2898
                                              file_storage_dir)
2899

    
2900
    if not result:
2901
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2902
      return False
2903

    
2904
    if not result[0]:
2905
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2906
      return False
2907

    
2908
  for device in instance.disks:
2909
    logger.Info("creating volume %s for instance %s" %
2910
                (device.iv_name, instance.name))
2911
    #HARDCODE
2912
    for secondary_node in instance.secondary_nodes:
2913
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2914
                                        device, False, info):
2915
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2916
                     (device.iv_name, device, secondary_node))
2917
        return False
2918
    #HARDCODE
2919
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2920
                                    instance, device, info):
2921
      logger.Error("failed to create volume %s on primary!" %
2922
                   device.iv_name)
2923
      return False
2924

    
2925
  return True
2926

    
2927

    
2928
def _RemoveDisks(instance, cfg):
2929
  """Remove all disks for an instance.
2930

2931
  This abstracts away some work from `AddInstance()` and
2932
  `RemoveInstance()`. Note that in case some of the devices couldn't
2933
  be removed, the removal will continue with the other ones (compare
2934
  with `_CreateDisks()`).
2935

2936
  Args:
2937
    instance: the instance object
2938

2939
  Returns:
2940
    True or False showing the success of the removal proces
2941

2942
  """
2943
  logger.Info("removing block devices for instance %s" % instance.name)
2944

    
2945
  result = True
2946
  for device in instance.disks:
2947
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2948
      cfg.SetDiskID(disk, node)
2949
      if not rpc.call_blockdev_remove(node, disk):
2950
        logger.Error("could not remove block device %s on node %s,"
2951
                     " continuing anyway" %
2952
                     (device.iv_name, node))
2953
        result = False
2954

    
2955
  if instance.disk_template == constants.DT_FILE:
2956
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2957
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2958
                                            file_storage_dir):
2959
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2960
      result = False
2961

    
2962
  return result
2963

    
2964

    
2965
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2966
  """Compute disk size requirements in the volume group
2967

2968
  This is currently hard-coded for the two-drive layout.
2969

2970
  """
2971
  # Required free disk space as a function of disk and swap space
2972
  req_size_dict = {
2973
    constants.DT_DISKLESS: None,
2974
    constants.DT_PLAIN: disk_size + swap_size,
2975
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2976
    constants.DT_DRBD8: disk_size + swap_size + 256,
2977
    constants.DT_FILE: None,
2978
  }
2979

    
2980
  if disk_template not in req_size_dict:
2981
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2982
                                 " is unknown" %  disk_template)
2983

    
2984
  return req_size_dict[disk_template]
2985

    
2986

    
2987
class LUCreateInstance(LogicalUnit):
2988
  """Create an instance.
2989

2990
  """
2991
  HPATH = "instance-add"
2992
  HTYPE = constants.HTYPE_INSTANCE
2993
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2994
              "disk_template", "swap_size", "mode", "start", "vcpus",
2995
              "wait_for_sync", "ip_check", "mac"]
2996

    
2997
  def _RunAllocator(self):
2998
    """Run the allocator based on input opcode.
2999

3000
    """
3001
    disks = [{"size": self.op.disk_size, "mode": "w"},
3002
             {"size": self.op.swap_size, "mode": "w"}]
3003
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3004
             "bridge": self.op.bridge}]
3005
    ial = IAllocator(self.cfg, self.sstore,
3006
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3007
                     name=self.op.instance_name,
3008
                     disk_template=self.op.disk_template,
3009
                     tags=[],
3010
                     os=self.op.os_type,
3011
                     vcpus=self.op.vcpus,
3012
                     mem_size=self.op.mem_size,
3013
                     disks=disks,
3014
                     nics=nics,
3015
                     )
3016

    
3017
    ial.Run(self.op.iallocator)
3018

    
3019
    if not ial.success:
3020
      raise errors.OpPrereqError("Can't compute nodes using"
3021
                                 " iallocator '%s': %s" % (self.op.iallocator,
3022
                                                           ial.info))
3023
    if len(ial.nodes) != ial.required_nodes:
3024
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3025
                                 " of nodes (%s), required %s" %
3026
                                 (len(ial.nodes), ial.required_nodes))
3027
    self.op.pnode = ial.nodes[0]
3028
    logger.ToStdout("Selected nodes for the instance: %s" %
3029
                    (", ".join(ial.nodes),))
3030
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3031
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3032
    if ial.required_nodes == 2:
3033
      self.op.snode = ial.nodes[1]
3034

    
3035
  def BuildHooksEnv(self):
3036
    """Build hooks env.
3037

3038
    This runs on master, primary and secondary nodes of the instance.
3039

3040
    """
3041
    env = {
3042
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3043
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3044
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3045
      "INSTANCE_ADD_MODE": self.op.mode,
3046
      }
3047
    if self.op.mode == constants.INSTANCE_IMPORT:
3048
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3049
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3050
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3051

    
3052
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3053
      primary_node=self.op.pnode,
3054
      secondary_nodes=self.secondaries,
3055
      status=self.instance_status,
3056
      os_type=self.op.os_type,
3057
      memory=self.op.mem_size,
3058
      vcpus=self.op.vcpus,
3059
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3060
    ))
3061

    
3062
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3063
          self.secondaries)
3064
    return env, nl, nl
3065

    
3066

    
3067
  def CheckPrereq(self):
3068
    """Check prerequisites.
3069

3070
    """
3071
    # set optional parameters to none if they don't exist
3072
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3073
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3074
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3075
      if not hasattr(self.op, attr):
3076
        setattr(self.op, attr, None)
3077

    
3078
    if self.op.mode not in (constants.INSTANCE_CREATE,
3079
                            constants.INSTANCE_IMPORT):
3080
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3081
                                 self.op.mode)
3082

    
3083
    if (not self.cfg.GetVGName() and
3084
        self.op.disk_template not in constants.DTS_NOT_LVM):
3085
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3086
                                 " instances")
3087

    
3088
    if self.op.mode == constants.INSTANCE_IMPORT:
3089
      src_node = getattr(self.op, "src_node", None)
3090
      src_path = getattr(self.op, "src_path", None)
3091
      if src_node is None or src_path is None:
3092
        raise errors.OpPrereqError("Importing an instance requires source"
3093
                                   " node and path options")
3094
      src_node_full = self.cfg.ExpandNodeName(src_node)
3095
      if src_node_full is None:
3096
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3097
      self.op.src_node = src_node = src_node_full
3098

    
3099
      if not os.path.isabs(src_path):
3100
        raise errors.OpPrereqError("The source path must be absolute")
3101

    
3102
      export_info = rpc.call_export_info(src_node, src_path)
3103

    
3104
      if not export_info:
3105
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3106

    
3107
      if not export_info.has_section(constants.INISECT_EXP):
3108
        raise errors.ProgrammerError("Corrupted export config")
3109

    
3110
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3111
      if (int(ei_version) != constants.EXPORT_VERSION):
3112
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3113
                                   (ei_version, constants.EXPORT_VERSION))
3114

    
3115
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3116
        raise errors.OpPrereqError("Can't import instance with more than"
3117
                                   " one data disk")
3118

    
3119
      # FIXME: are the old os-es, disk sizes, etc. useful?
3120
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3121
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3122
                                                         'disk0_dump'))
3123
      self.src_image = diskimage
3124
    else: # INSTANCE_CREATE
3125
      if getattr(self.op, "os_type", None) is None:
3126
        raise errors.OpPrereqError("No guest OS specified")
3127

    
3128
    #### instance parameters check
3129

    
3130
    # disk template and mirror node verification
3131
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3132
      raise errors.OpPrereqError("Invalid disk template name")
3133

    
3134
    # instance name verification
3135
    hostname1 = utils.HostInfo(self.op.instance_name)
3136

    
3137
    self.op.instance_name = instance_name = hostname1.name
3138
    instance_list = self.cfg.GetInstanceList()
3139
    if instance_name in instance_list:
3140
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3141
                                 instance_name)
3142

    
3143
    # ip validity checks
3144
    ip = getattr(self.op, "ip", None)
3145
    if ip is None or ip.lower() == "none":
3146
      inst_ip = None
3147
    elif ip.lower() == "auto":
3148
      inst_ip = hostname1.ip
3149
    else:
3150
      if not utils.IsValidIP(ip):
3151
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3152
                                   " like a valid IP" % ip)
3153
      inst_ip = ip
3154
    self.inst_ip = self.op.ip = inst_ip
3155

    
3156
    if self.op.start and not self.op.ip_check:
3157
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3158
                                 " adding an instance in start mode")
3159

    
3160
    if self.op.ip_check:
3161
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3162
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3163
                                   (hostname1.ip, instance_name))
3164

    
3165
    # MAC address verification
3166
    if self.op.mac != "auto":
3167
      if not utils.IsValidMac(self.op.mac.lower()):
3168
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3169
                                   self.op.mac)
3170

    
3171
    # bridge verification
3172
    bridge = getattr(self.op, "bridge", None)
3173
    if bridge is None:
3174
      self.op.bridge = self.cfg.GetDefBridge()
3175
    else:
3176
      self.op.bridge = bridge
3177

    
3178
    # boot order verification
3179
    if self.op.hvm_boot_order is not None:
3180
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3181
        raise errors.OpPrereqError("invalid boot order specified,"
3182
                                   " must be one or more of [acdn]")
3183
    # file storage checks
3184
    if (self.op.file_driver and
3185
        not self.op.file_driver in constants.FILE_DRIVER):
3186
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3187
                                 self.op.file_driver)
3188

    
3189
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3190
      raise errors.OpPrereqError("File storage directory not a relative"
3191
                                 " path")
3192
    #### allocator run
3193

    
3194
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3195
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3196
                                 " node must be given")
3197

    
3198
    if self.op.iallocator is not None:
3199
      self._RunAllocator()
3200

    
3201
    #### node related checks
3202

    
3203
    # check primary node
3204
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3205
    if pnode is None:
3206
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3207
                                 self.op.pnode)
3208
    self.op.pnode = pnode.name
3209
    self.pnode = pnode
3210
    self.secondaries = []
3211

    
3212
    # mirror node verification
3213
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3214
      if getattr(self.op, "snode", None) is None:
3215
        raise errors.OpPrereqError("The networked disk templates need"
3216
                                   " a mirror node")
3217

    
3218
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3219
      if snode_name is None:
3220
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3221
                                   self.op.snode)
3222
      elif snode_name == pnode.name:
3223
        raise errors.OpPrereqError("The secondary node cannot be"
3224
                                   " the primary node.")
3225
      self.secondaries.append(snode_name)
3226

    
3227
    req_size = _ComputeDiskSize(self.op.disk_template,
3228
                                self.op.disk_size, self.op.swap_size)
3229

    
3230
    # Check lv size requirements
3231
    if req_size is not None:
3232
      nodenames = [pnode.name] + self.secondaries
3233
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3234
      for node in nodenames:
3235
        info = nodeinfo.get(node, None)
3236
        if not info:
3237
          raise errors.OpPrereqError("Cannot get current information"
3238
                                     " from node '%s'" % node)
3239
        vg_free = info.get('vg_free', None)
3240
        if not isinstance(vg_free, int):
3241
          raise errors.OpPrereqError("Can't compute free disk space on"
3242
                                     " node %s" % node)
3243
        if req_size > info['vg_free']:
3244
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3245
                                     " %d MB available, %d MB required" %
3246
                                     (node, info['vg_free'], req_size))
3247

    
3248
    # os verification
3249
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3250
    if not os_obj:
3251
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3252
                                 " primary node"  % self.op.os_type)
3253

    
3254
    if self.op.kernel_path == constants.VALUE_NONE:
3255
      raise errors.OpPrereqError("Can't set instance kernel to none")
3256

    
3257

    
3258
    # bridge check on primary node
3259
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3260
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3261
                                 " destination node '%s'" %
3262
                                 (self.op.bridge, pnode.name))
3263

    
3264
    # memory check on primary node
3265
    if self.op.start:
3266
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3267
                           "creating instance %s" % self.op.instance_name,
3268
                           self.op.mem_size)
3269

    
3270
    # hvm_cdrom_image_path verification
3271
    if self.op.hvm_cdrom_image_path is not None:
3272
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3273
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3274
                                   " be an absolute path or None, not %s" %
3275
                                   self.op.hvm_cdrom_image_path)
3276
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3277
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3278
                                   " regular file or a symlink pointing to"
3279
                                   " an existing regular file, not %s" %
3280
                                   self.op.hvm_cdrom_image_path)
3281

    
3282
    # vnc_bind_address verification
3283
    if self.op.vnc_bind_address is not None:
3284
      if not utils.IsValidIP(self.op.vnc_bind_address):
3285
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3286
                                   " like a valid IP address" %
3287
                                   self.op.vnc_bind_address)
3288

    
3289
    # Xen HVM device type checks
3290
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3291
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3292
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3293
                                   " hypervisor" % self.op.hvm_nic_type)
3294
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3295
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3296
                                   " hypervisor" % self.op.hvm_disk_type)
3297

    
3298
    if self.op.start:
3299
      self.instance_status = 'up'
3300
    else:
3301
      self.instance_status = 'down'
3302

    
3303
  def Exec(self, feedback_fn):
3304
    """Create and add the instance to the cluster.
3305

3306
    """
3307
    instance = self.op.instance_name
3308
    pnode_name = self.pnode.name
3309

    
3310
    if self.op.mac == "auto":
3311
      mac_address = self.cfg.GenerateMAC()
3312
    else:
3313
      mac_address = self.op.mac
3314

    
3315
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3316
    if self.inst_ip is not None:
3317
      nic.ip = self.inst_ip
3318

    
3319
    ht_kind = self.sstore.GetHypervisorType()
3320
    if ht_kind in constants.HTS_REQ_PORT:
3321
      network_port = self.cfg.AllocatePort()
3322
    else:
3323
      network_port = None
3324

    
3325
    if self.op.vnc_bind_address is None:
3326
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3327

    
3328
    # this is needed because os.path.join does not accept None arguments
3329
    if self.op.file_storage_dir is None:
3330
      string_file_storage_dir = ""
3331
    else:
3332
      string_file_storage_dir = self.op.file_storage_dir
3333

    
3334
    # build the full file storage dir path
3335
    file_storage_dir = os.path.normpath(os.path.join(
3336
                                        self.sstore.GetFileStorageDir(),
3337
                                        string_file_storage_dir, instance))
3338

    
3339

    
3340
    disks = _GenerateDiskTemplate(self.cfg,
3341
                                  self.op.disk_template,
3342
                                  instance, pnode_name,
3343
                                  self.secondaries, self.op.disk_size,
3344
                                  self.op.swap_size,
3345
                                  file_storage_dir,
3346
                                  self.op.file_driver)
3347

    
3348
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3349
                            primary_node=pnode_name,
3350
                            memory=self.op.mem_size,
3351
                            vcpus=self.op.vcpus,
3352
                            nics=[nic], disks=disks,
3353
                            disk_template=self.op.disk_template,
3354
                            status=self.instance_status,
3355
                            network_port=network_port,
3356
                            kernel_path=self.op.kernel_path,
3357
                            initrd_path=self.op.initrd_path,
3358
                            hvm_boot_order=self.op.hvm_boot_order,
3359
                            hvm_acpi=self.op.hvm_acpi,
3360
                            hvm_pae=self.op.hvm_pae,
3361
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3362
                            vnc_bind_address=self.op.vnc_bind_address,
3363
                            hvm_nic_type=self.op.hvm_nic_type,
3364
                            hvm_disk_type=self.op.hvm_disk_type,
3365
                            )
3366

    
3367
    feedback_fn("* creating instance disks...")
3368
    if not _CreateDisks(self.cfg, iobj):
3369
      _RemoveDisks(iobj, self.cfg)
3370
      raise errors.OpExecError("Device creation failed, reverting...")
3371

    
3372
    feedback_fn("adding instance %s to cluster config" % instance)
3373

    
3374
    self.cfg.AddInstance(iobj)
3375
    # Add the new instance to the Ganeti Lock Manager
3376
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3377

    
3378
    if self.op.wait_for_sync:
3379
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3380
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3381
      # make sure the disks are not degraded (still sync-ing is ok)
3382
      time.sleep(15)
3383
      feedback_fn("* checking mirrors status")
3384
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3385
    else:
3386
      disk_abort = False
3387

    
3388
    if disk_abort:
3389
      _RemoveDisks(iobj, self.cfg)
3390
      self.cfg.RemoveInstance(iobj.name)
3391
      # Remove the new instance from the Ganeti Lock Manager
3392
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3393
      raise errors.OpExecError("There are some degraded disks for"
3394
                               " this instance")
3395

    
3396
    feedback_fn("creating os for instance %s on node %s" %
3397
                (instance, pnode_name))
3398

    
3399
    if iobj.disk_template != constants.DT_DISKLESS:
3400
      if self.op.mode == constants.INSTANCE_CREATE:
3401
        feedback_fn("* running the instance OS create scripts...")
3402
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3403
          raise errors.OpExecError("could not add os for instance %s"
3404
                                   " on node %s" %
3405
                                   (instance, pnode_name))
3406

    
3407
      elif self.op.mode == constants.INSTANCE_IMPORT:
3408
        feedback_fn("* running the instance OS import scripts...")
3409
        src_node = self.op.src_node
3410
        src_image = self.src_image
3411
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3412
                                                src_node, src_image):
3413
          raise errors.OpExecError("Could not import os for instance"
3414
                                   " %s on node %s" %
3415
                                   (instance, pnode_name))
3416
      else:
3417
        # also checked in the prereq part
3418
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3419
                                     % self.op.mode)
3420

    
3421
    if self.op.start:
3422
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3423
      feedback_fn("* starting instance...")
3424
      if not rpc.call_instance_start(pnode_name, iobj, None):
3425
        raise errors.OpExecError("Could not start instance")
3426

    
3427

    
3428
class LUConnectConsole(NoHooksLU):
3429
  """Connect to an instance's console.
3430

3431
  This is somewhat special in that it returns the command line that
3432
  you need to run on the master node in order to connect to the
3433
  console.
3434

3435
  """
3436
  _OP_REQP = ["instance_name"]
3437
  REQ_BGL = False
3438

    
3439
  def ExpandNames(self):
3440
    self._ExpandAndLockInstance()
3441

    
3442
  def CheckPrereq(self):
3443
    """Check prerequisites.
3444

3445
    This checks that the instance is in the cluster.
3446

3447
    """
3448
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3449
    assert self.instance is not None, \
3450
      "Cannot retrieve locked instance %s" % self.op.instance_name
3451

    
3452
  def Exec(self, feedback_fn):
3453
    """Connect to the console of an instance
3454

3455
    """
3456
    instance = self.instance
3457
    node = instance.primary_node
3458

    
3459
    node_insts = rpc.call_instance_list([node])[node]
3460
    if node_insts is False:
3461
      raise errors.OpExecError("Can't connect to node %s." % node)
3462

    
3463
    if instance.name not in node_insts:
3464
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3465

    
3466
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3467

    
3468
    hyper = hypervisor.GetHypervisor()
3469
    console_cmd = hyper.GetShellCommandForConsole(instance)
3470

    
3471
    # build ssh cmdline
3472
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3473

    
3474

    
3475
class LUReplaceDisks(LogicalUnit):
3476
  """Replace the disks of an instance.
3477

3478
  """
3479
  HPATH = "mirrors-replace"
3480
  HTYPE = constants.HTYPE_INSTANCE
3481
  _OP_REQP = ["instance_name", "mode", "disks"]
3482

    
3483
  def _RunAllocator(self):
3484
    """Compute a new secondary node using an IAllocator.
3485

3486
    """
3487
    ial = IAllocator(self.cfg, self.sstore,
3488
                     mode=constants.IALLOCATOR_MODE_RELOC,
3489
                     name=self.op.instance_name,
3490
                     relocate_from=[self.sec_node])
3491

    
3492
    ial.Run(self.op.iallocator)
3493

    
3494
    if not ial.success:
3495
      raise errors.OpPrereqError("Can't compute nodes using"
3496
                                 " iallocator '%s': %s" % (self.op.iallocator,
3497
                                                           ial.info))
3498
    if len(ial.nodes) != ial.required_nodes:
3499
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3500
                                 " of nodes (%s), required %s" %
3501
                                 (len(ial.nodes), ial.required_nodes))
3502
    self.op.remote_node = ial.nodes[0]
3503
    logger.ToStdout("Selected new secondary for the instance: %s" %
3504
                    self.op.remote_node)
3505

    
3506
  def BuildHooksEnv(self):
3507
    """Build hooks env.
3508

3509
    This runs on the master, the primary and all the secondaries.
3510

3511
    """
3512
    env = {
3513
      "MODE": self.op.mode,
3514
      "NEW_SECONDARY": self.op.remote_node,
3515
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3516
      }
3517
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3518
    nl = [
3519
      self.sstore.GetMasterNode(),
3520
      self.instance.primary_node,
3521
      ]
3522
    if self.op.remote_node is not None:
3523
      nl.append(self.op.remote_node)
3524
    return env, nl, nl
3525

    
3526
  def CheckPrereq(self):
3527
    """Check prerequisites.
3528

3529
    This checks that the instance is in the cluster.
3530

3531
    """
3532
    if not hasattr(self.op, "remote_node"):
3533
      self.op.remote_node = None
3534

    
3535
    instance = self.cfg.GetInstanceInfo(
3536
      self.cfg.ExpandInstanceName(self.op.instance_name))
3537
    if instance is None:
3538
      raise errors.OpPrereqError("Instance '%s' not known" %
3539
                                 self.op.instance_name)
3540
    self.instance = instance
3541
    self.op.instance_name = instance.name
3542

    
3543
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3544
      raise errors.OpPrereqError("Instance's disk layout is not"
3545
                                 " network mirrored.")
3546

    
3547
    if len(instance.secondary_nodes) != 1:
3548
      raise errors.OpPrereqError("The instance has a strange layout,"
3549
                                 " expected one secondary but found %d" %
3550
                                 len(instance.secondary_nodes))
3551

    
3552
    self.sec_node = instance.secondary_nodes[0]
3553

    
3554
    ia_name = getattr(self.op, "iallocator", None)
3555
    if ia_name is not None:
3556
      if self.op.remote_node is not None:
3557
        raise errors.OpPrereqError("Give either the iallocator or the new"
3558
                                   " secondary, not both")
3559
      self.op.remote_node = self._RunAllocator()
3560

    
3561
    remote_node = self.op.remote_node
3562
    if remote_node is not None:
3563
      remote_node = self.cfg.ExpandNodeName(remote_node)
3564
      if remote_node is None:
3565
        raise errors.OpPrereqError("Node '%s' not known" %
3566
                                   self.op.remote_node)
3567
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3568
    else:
3569
      self.remote_node_info = None
3570
    if remote_node == instance.primary_node:
3571
      raise errors.OpPrereqError("The specified node is the primary node of"
3572
                                 " the instance.")
3573
    elif remote_node == self.sec_node:
3574
      if self.op.mode == constants.REPLACE_DISK_SEC:
3575
        # this is for DRBD8, where we can't execute the same mode of
3576
        # replacement as for drbd7 (no different port allocated)
3577
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3578
                                   " replacement")
3579
    if instance.disk_template == constants.DT_DRBD8:
3580
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3581
          remote_node is not None):
3582
        # switch to replace secondary mode
3583
        self.op.mode = constants.REPLACE_DISK_SEC
3584

    
3585
      if self.op.mode == constants.REPLACE_DISK_ALL:
3586
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3587
                                   " secondary disk replacement, not"
3588
                                   " both at once")
3589
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3590
        if remote_node is not None:
3591
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3592
                                     " the secondary while doing a primary"
3593
                                     " node disk replacement")
3594
        self.tgt_node = instance.primary_node
3595
        self.oth_node = instance.secondary_nodes[0]
3596
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3597
        self.new_node = remote_node # this can be None, in which case
3598
                                    # we don't change the secondary
3599
        self.tgt_node = instance.secondary_nodes[0]
3600
        self.oth_node = instance.primary_node
3601
      else:
3602
        raise errors.ProgrammerError("Unhandled disk replace mode")
3603

    
3604
    for name in self.op.disks:
3605
      if instance.FindDisk(name) is None:
3606
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3607
                                   (name, instance.name))
3608
    self.op.remote_node = remote_node
3609

    
3610
  def _ExecD8DiskOnly(self, feedback_fn):
3611
    """Replace a disk on the primary or secondary for dbrd8.
3612

3613
    The algorithm for replace is quite complicated:
3614
      - for each disk to be replaced:
3615
        - create new LVs on the target node with unique names
3616
        - detach old LVs from the drbd device
3617
        - rename old LVs to name_replaced.<time_t>
3618
        - rename new LVs to old LVs
3619
        - attach the new LVs (with the old names now) to the drbd device
3620
      - wait for sync across all devices
3621
      - for each modified disk:
3622
        - remove old LVs (which have the name name_replaces.<time_t>)
3623

3624
    Failures are not very well handled.
3625

3626
    """
3627
    steps_total = 6
3628
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3629
    instance = self.instance
3630
    iv_names = {}
3631
    vgname = self.cfg.GetVGName()
3632
    # start of work
3633
    cfg = self.cfg
3634
    tgt_node = self.tgt_node
3635
    oth_node = self.oth_node
3636

    
3637
    # Step: check device activation
3638
    self.proc.LogStep(1, steps_total, "check device existence")
3639
    info("checking volume groups")
3640
    my_vg = cfg.GetVGName()
3641
    results = rpc.call_vg_list([oth_node, tgt_node])
3642
    if not results:
3643
      raise errors.OpExecError("Can't list volume groups on the nodes")
3644
    for node in oth_node, tgt_node:
3645
      res = results.get(node, False)
3646
      if not res or my_vg not in res:
3647
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3648
                                 (my_vg, node))
3649
    for dev in instance.disks:
3650
      if not dev.iv_name in self.op.disks:
3651
        continue
3652
      for node in tgt_node, oth_node:
3653
        info("checking %s on %s" % (dev.iv_name, node))
3654
        cfg.SetDiskID(dev, node)
3655
        if not rpc.call_blockdev_find(node, dev):
3656
          raise errors.OpExecError("Can't find device %s on node %s" %
3657
                                   (dev.iv_name, node))
3658

    
3659
    # Step: check other node consistency
3660
    self.proc.LogStep(2, steps_total, "check peer consistency")
3661
    for dev in instance.disks:
3662
      if not dev.iv_name in self.op.disks:
3663
        continue
3664
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3665
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3666
                                   oth_node==instance.primary_node):
3667
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3668
                                 " to replace disks on this node (%s)" %
3669
                                 (oth_node, tgt_node))
3670

    
3671
    # Step: create new storage
3672
    self.proc.LogStep(3, steps_total, "allocate new storage")
3673
    for dev in instance.disks:
3674
      if not dev.iv_name in self.op.disks:
3675
        continue
3676
      size = dev.size
3677
      cfg.SetDiskID(dev, tgt_node)
3678
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3679
      names = _GenerateUniqueNames(cfg, lv_names)
3680
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3681
                             logical_id=(vgname, names[0]))
3682
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3683
                             logical_id=(vgname, names[1]))
3684
      new_lvs = [lv_data, lv_meta]
3685
      old_lvs = dev.children
3686
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3687
      info("creating new local storage on %s for %s" %
3688
           (tgt_node, dev.iv_name))
3689
      # since we *always* want to create this LV, we use the
3690
      # _Create...OnPrimary (which forces the creation), even if we
3691
      # are talking about the secondary node
3692
      for new_lv in new_lvs:
3693
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3694
                                        _GetInstanceInfoText(instance)):
3695
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3696
                                   " node '%s'" %
3697
                                   (new_lv.logical_id[1], tgt_node))
3698

    
3699
    # Step: for each lv, detach+rename*2+attach
3700
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3701
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3702
      info("detaching %s drbd from local storage" % dev.iv_name)
3703
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3704
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3705
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3706
      #dev.children = []
3707
      #cfg.Update(instance)
3708

    
3709
      # ok, we created the new LVs, so now we know we have the needed
3710
      # storage; as such, we proceed on the target node to rename
3711
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3712
      # using the assumption that logical_id == physical_id (which in
3713
      # turn is the unique_id on that node)
3714

    
3715
      # FIXME(iustin): use a better name for the replaced LVs
3716
      temp_suffix = int(time.time())
3717
      ren_fn = lambda d, suff: (d.physical_id[0],
3718
                                d.physical_id[1] + "_replaced-%s" % suff)
3719
      # build the rename list based on what LVs exist on the node
3720
      rlist = []
3721
      for to_ren in old_lvs:
3722
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3723
        if find_res is not None: # device exists
3724
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3725

    
3726
      info("renaming the old LVs on the target node")
3727
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3728
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3729
      # now we rename the new LVs to the old LVs
3730
      info("renaming the new LVs on the target node")
3731
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3732
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3733
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3734

    
3735
      for old, new in zip(old_lvs, new_lvs):
3736
        new.logical_id = old.logical_id
3737
        cfg.SetDiskID(new, tgt_node)
3738

    
3739
      for disk in old_lvs:
3740
        disk.logical_id = ren_fn(disk, temp_suffix)
3741
        cfg.SetDiskID(disk, tgt_node)
3742

    
3743
      # now that the new lvs have the old name, we can add them to the device
3744
      info("adding new mirror component on %s" % tgt_node)
3745
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3746
        for new_lv in new_lvs:
3747
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3748
            warning("Can't rollback device %s", hint="manually cleanup unused"
3749
                    " logical volumes")
3750
        raise errors.OpExecError("Can't add local storage to drbd")
3751

    
3752
      dev.children = new_lvs
3753
      cfg.Update(instance)
3754

    
3755
    # Step: wait for sync
3756

    
3757
    # this can fail as the old devices are degraded and _WaitForSync
3758
    # does a combined result over all disks, so we don't check its
3759
    # return value
3760
    self.proc.LogStep(5, steps_total, "sync devices")
3761
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3762

    
3763
    # so check manually all the devices
3764
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3765
      cfg.SetDiskID(dev, instance.primary_node)
3766
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3767
      if is_degr:
3768
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3769

    
3770
    # Step: remove old storage
3771
    self.proc.LogStep(6, steps_total, "removing old storage")
3772
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3773
      info("remove logical volumes for %s" % name)
3774
      for lv in old_lvs:
3775
        cfg.SetDiskID(lv, tgt_node)
3776
        if not rpc.call_blockdev_remove(tgt_node, lv):
3777
          warning("Can't remove old LV", hint="manually remove unused LVs")
3778
          continue
3779

    
3780
  def _ExecD8Secondary(self, feedback_fn):
3781
    """Replace the secondary node for drbd8.
3782

3783
    The algorithm for replace is quite complicated:
3784
      - for all disks of the instance:
3785
        - create new LVs on the new node with same names
3786
        - shutdown the drbd device on the old secondary
3787
        - disconnect the drbd network on the primary
3788
        - create the drbd device on the new secondary
3789
        - network attach the drbd on the primary, using an artifice:
3790
          the drbd code for Attach() will connect to the network if it
3791
          finds a device which is connected to the good local disks but
3792
          not network enabled
3793
      - wait for sync across all devices
3794
      - remove all disks from the old secondary
3795

3796
    Failures are not very well handled.
3797

3798
    """
3799
    steps_total = 6
3800
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3801
    instance = self.instance
3802
    iv_names = {}
3803
    vgname = self.cfg.GetVGName()
3804
    # start of work
3805
    cfg = self.cfg
3806
    old_node = self.tgt_node
3807
    new_node = self.new_node
3808
    pri_node = instance.primary_node
3809

    
3810
    # Step: check device activation
3811
    self.proc.LogStep(1, steps_total, "check device existence")
3812
    info("checking volume groups")
3813
    my_vg = cfg.GetVGName()
3814
    results = rpc.call_vg_list([pri_node, new_node])
3815
    if not results:
3816
      raise errors.OpExecError("Can't list volume groups on the nodes")
3817
    for node in pri_node, new_node:
3818
      res = results.get(node, False)
3819
      if not res or my_vg not in res:
3820
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3821
                                 (my_vg, node))
3822
    for dev in instance.disks:
3823
      if not dev.iv_name in self.op.disks:
3824
        continue
3825
      info("checking %s on %s" % (dev.iv_name, pri_node))
3826
      cfg.SetDiskID(dev, pri_node)
3827
      if not rpc.call_blockdev_find(pri_node, dev):
3828
        raise errors.OpExecError("Can't find device %s on node %s" %
3829
                                 (dev.iv_name, pri_node))
3830

    
3831
    # Step: check other node consistency
3832
    self.proc.LogStep(2, steps_total, "check peer consistency")
3833
    for dev in instance.disks:
3834
      if not dev.iv_name in self.op.disks:
3835
        continue
3836
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3837
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3838
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3839
                                 " unsafe to replace the secondary" %
3840
                                 pri_node)
3841

    
3842
    # Step: create new storage
3843
    self.proc.LogStep(3, steps_total, "allocate new storage")
3844
    for dev in instance.disks:
3845
      size = dev.size
3846
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3847
      # since we *always* want to create this LV, we use the
3848
      # _Create...OnPrimary (which forces the creation), even if we
3849
      # are talking about the secondary node
3850
      for new_lv in dev.children:
3851
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3852
                                        _GetInstanceInfoText(instance)):
3853
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3854
                                   " node '%s'" %
3855
                                   (new_lv.logical_id[1], new_node))
3856

    
3857
      iv_names[dev.iv_name] = (dev, dev.children)
3858

    
3859
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3860
    for dev in instance.disks:
3861
      size = dev.size
3862
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3863
      # create new devices on new_node
3864
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3865
                              logical_id=(pri_node, new_node,
3866
                                          dev.logical_id[2]),
3867
                              children=dev.children)
3868
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3869
                                        new_drbd, False,
3870
                                      _GetInstanceInfoText(instance)):
3871
        raise errors.OpExecError("Failed to create new DRBD on"
3872
                                 " node '%s'" % new_node)
3873

    
3874
    for dev in instance.disks:
3875
      # we have new devices, shutdown the drbd on the old secondary
3876
      info("shutting down drbd for %s on old node" % dev.iv_name)
3877
      cfg.SetDiskID(dev, old_node)
3878
      if not rpc.call_blockdev_shutdown(old_node, dev):
3879
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3880
                hint="Please cleanup this device manually as soon as possible")
3881

    
3882
    info("detaching primary drbds from the network (=> standalone)")
3883
    done = 0
3884
    for dev in instance.disks:
3885
      cfg.SetDiskID(dev, pri_node)
3886
      # set the physical (unique in bdev terms) id to None, meaning
3887
      # detach from network
3888
      dev.physical_id = (None,) * len(dev.physical_id)
3889
      # and 'find' the device, which will 'fix' it to match the
3890
      # standalone state
3891
      if rpc.call_blockdev_find(pri_node, dev):
3892
        done += 1
3893
      else:
3894
        warning("Failed to detach drbd %s from network, unusual case" %
3895
                dev.iv_name)
3896

    
3897
    if not done:
3898
      # no detaches succeeded (very unlikely)
3899
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3900

    
3901
    # if we managed to detach at least one, we update all the disks of
3902
    # the instance to point to the new secondary
3903
    info("updating instance configuration")
3904
    for dev in instance.disks:
3905
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3906
      cfg.SetDiskID(dev, pri_node)
3907
    cfg.Update(instance)
3908

    
3909
    # and now perform the drbd attach
3910
    info("attaching primary drbds to new secondary (standalone => connected)")
3911
    failures = []
3912
    for dev in instance.disks:
3913
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3914
      # since the attach is smart, it's enough to 'find' the device,
3915
      # it will automatically activate the network, if the physical_id
3916
      # is correct
3917
      cfg.SetDiskID(dev, pri_node)
3918
      if not rpc.call_blockdev_find(pri_node, dev):
3919
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3920
                "please do a gnt-instance info to see the status of disks")
3921

    
3922
    # this can fail as the old devices are degraded and _WaitForSync
3923
    # does a combined result over all disks, so we don't check its
3924
    # return value
3925
    self.proc.LogStep(5, steps_total, "sync devices")
3926
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3927

    
3928
    # so check manually all the devices
3929
    for name, (dev, old_lvs) in iv_names.iteritems():
3930
      cfg.SetDiskID(dev, pri_node)
3931
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3932
      if is_degr:
3933
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3934

    
3935
    self.proc.LogStep(6, steps_total, "removing old storage")
3936
    for name, (dev, old_lvs) in iv_names.iteritems():
3937
      info("remove logical volumes for %s" % name)
3938
      for lv in old_lvs:
3939
        cfg.SetDiskID(lv, old_node)
3940
        if not rpc.call_blockdev_remove(old_node, lv):
3941
          warning("Can't remove LV on old secondary",
3942
                  hint="Cleanup stale volumes by hand")
3943

    
3944
  def Exec(self, feedback_fn):
3945
    """Execute disk replacement.
3946

3947
    This dispatches the disk replacement to the appropriate handler.
3948

3949
    """
3950
    instance = self.instance
3951

    
3952
    # Activate the instance disks if we're replacing them on a down instance
3953
    if instance.status == "down":
3954
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3955
      self.proc.ChainOpCode(op)
3956

    
3957
    if instance.disk_template == constants.DT_DRBD8:
3958
      if self.op.remote_node is None:
3959
        fn = self._ExecD8DiskOnly
3960
      else:
3961
        fn = self._ExecD8Secondary
3962
    else:
3963
      raise errors.ProgrammerError("Unhandled disk replacement case")
3964

    
3965
    ret = fn(feedback_fn)
3966