Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4300c4b6

History | View | Annotate | Download (175.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89

    
90
    for attr_name in self._OP_REQP:
91
      attr_val = getattr(op, attr_name, None)
92
      if attr_val is None:
93
        raise errors.OpPrereqError("Required parameter '%s' missing" %
94
                                   attr_name)
95

    
96
    if not self.cfg.IsCluster():
97
      raise errors.OpPrereqError("Cluster not initialized yet,"
98
                                 " use 'gnt-cluster init' first.")
99
    if self.REQ_MASTER:
100
      master = sstore.GetMasterNode()
101
      if master != utils.HostInfo().name:
102
        raise errors.OpPrereqError("Commands must be run on the master"
103
                                   " node %s" % master)
104

    
105
  def __GetSSH(self):
106
    """Returns the SshRunner object
107

108
    """
109
    if not self.__ssh:
110
      self.__ssh = ssh.SshRunner(self.sstore)
111
    return self.__ssh
112

    
113
  ssh = property(fget=__GetSSH)
114

    
115
  def ExpandNames(self):
116
    """Expand names for this LU.
117

118
    This method is called before starting to execute the opcode, and it should
119
    update all the parameters of the opcode to their canonical form (e.g. a
120
    short node name must be fully expanded after this method has successfully
121
    completed). This way locking, hooks, logging, ecc. can work correctly.
122

123
    LUs which implement this method must also populate the self.needed_locks
124
    member, as a dict with lock levels as keys, and a list of needed lock names
125
    as values. Rules:
126
      - Use an empty dict if you don't need any lock
127
      - If you don't need any lock at a particular level omit that level
128
      - Don't put anything for the BGL level
129
      - If you want all locks at a level use None as a value
130
        (this reflects what LockSet does, and will be replaced before
131
        CheckPrereq with the full list of nodes that have been locked)
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: None,
141
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    """
286
    assert locking.LEVEL_NODE in self.recalculate_locks, \
287
      "_LockInstancesNodes helper function called with no nodes to recalculate"
288

    
289
    # TODO: check if we're really been called with the instance locks held
290

    
291
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
292
    # future we might want to have different behaviors depending on the value
293
    # of self.recalculate_locks[locking.LEVEL_NODE]
294
    wanted_nodes = []
295
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
296
      instance = self.context.cfg.GetInstanceInfo(instance_name)
297
      wanted_nodes.append(instance.primary_node)
298
      wanted_nodes.extend(instance.secondary_nodes)
299
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
300

    
301
    del self.recalculate_locks[locking.LEVEL_NODE]
302

    
303

    
304
class NoHooksLU(LogicalUnit):
305
  """Simple LU which runs no hooks.
306

307
  This LU is intended as a parent for other LogicalUnits which will
308
  run no hooks, in order to reduce duplicate code.
309

310
  """
311
  HPATH = None
312
  HTYPE = None
313

    
314

    
315
def _GetWantedNodes(lu, nodes):
316
  """Returns list of checked and expanded node names.
317

318
  Args:
319
    nodes: List of nodes (strings) or None for all
320

321
  """
322
  if not isinstance(nodes, list):
323
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
324

    
325
  if nodes:
326
    wanted = []
327

    
328
    for name in nodes:
329
      node = lu.cfg.ExpandNodeName(name)
330
      if node is None:
331
        raise errors.OpPrereqError("No such node name '%s'" % name)
332
      wanted.append(node)
333

    
334
  else:
335
    wanted = lu.cfg.GetNodeList()
336
  return utils.NiceSort(wanted)
337

    
338

    
339
def _GetWantedInstances(lu, instances):
340
  """Returns list of checked and expanded instance names.
341

342
  Args:
343
    instances: List of instances (strings) or None for all
344

345
  """
346
  if not isinstance(instances, list):
347
    raise errors.OpPrereqError("Invalid argument type 'instances'")
348

    
349
  if instances:
350
    wanted = []
351

    
352
    for name in instances:
353
      instance = lu.cfg.ExpandInstanceName(name)
354
      if instance is None:
355
        raise errors.OpPrereqError("No such instance name '%s'" % name)
356
      wanted.append(instance)
357

    
358
  else:
359
    wanted = lu.cfg.GetInstanceList()
360
  return utils.NiceSort(wanted)
361

    
362

    
363
def _CheckOutputFields(static, dynamic, selected):
364
  """Checks whether all selected fields are valid.
365

366
  Args:
367
    static: Static fields
368
    dynamic: Dynamic fields
369

370
  """
371
  static_fields = frozenset(static)
372
  dynamic_fields = frozenset(dynamic)
373

    
374
  all_fields = static_fields | dynamic_fields
375

    
376
  if not all_fields.issuperset(selected):
377
    raise errors.OpPrereqError("Unknown output fields selected: %s"
378
                               % ",".join(frozenset(selected).
379
                                          difference(all_fields)))
380

    
381

    
382
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
383
                          memory, vcpus, nics):
384
  """Builds instance related env variables for hooks from single variables.
385

386
  Args:
387
    secondary_nodes: List of secondary nodes as strings
388
  """
389
  env = {
390
    "OP_TARGET": name,
391
    "INSTANCE_NAME": name,
392
    "INSTANCE_PRIMARY": primary_node,
393
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
394
    "INSTANCE_OS_TYPE": os_type,
395
    "INSTANCE_STATUS": status,
396
    "INSTANCE_MEMORY": memory,
397
    "INSTANCE_VCPUS": vcpus,
398
  }
399

    
400
  if nics:
401
    nic_count = len(nics)
402
    for idx, (ip, bridge, mac) in enumerate(nics):
403
      if ip is None:
404
        ip = ""
405
      env["INSTANCE_NIC%d_IP" % idx] = ip
406
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
407
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
408
  else:
409
    nic_count = 0
410

    
411
  env["INSTANCE_NIC_COUNT"] = nic_count
412

    
413
  return env
414

    
415

    
416
def _BuildInstanceHookEnvByObject(instance, override=None):
417
  """Builds instance related env variables for hooks from an object.
418

419
  Args:
420
    instance: objects.Instance object of instance
421
    override: dict of values to override
422
  """
423
  args = {
424
    'name': instance.name,
425
    'primary_node': instance.primary_node,
426
    'secondary_nodes': instance.secondary_nodes,
427
    'os_type': instance.os,
428
    'status': instance.os,
429
    'memory': instance.memory,
430
    'vcpus': instance.vcpus,
431
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
432
  }
433
  if override:
434
    args.update(override)
435
  return _BuildInstanceHookEnv(**args)
436

    
437

    
438
def _CheckInstanceBridgesExist(instance):
439
  """Check that the brigdes needed by an instance exist.
440

441
  """
442
  # check bridges existance
443
  brlist = [nic.bridge for nic in instance.nics]
444
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
445
    raise errors.OpPrereqError("one or more target bridges %s does not"
446
                               " exist on destination node '%s'" %
447
                               (brlist, instance.primary_node))
448

    
449

    
450
class LUDestroyCluster(NoHooksLU):
451
  """Logical unit for destroying the cluster.
452

453
  """
454
  _OP_REQP = []
455

    
456
  def CheckPrereq(self):
457
    """Check prerequisites.
458

459
    This checks whether the cluster is empty.
460

461
    Any errors are signalled by raising errors.OpPrereqError.
462

463
    """
464
    master = self.sstore.GetMasterNode()
465

    
466
    nodelist = self.cfg.GetNodeList()
467
    if len(nodelist) != 1 or nodelist[0] != master:
468
      raise errors.OpPrereqError("There are still %d node(s) in"
469
                                 " this cluster." % (len(nodelist) - 1))
470
    instancelist = self.cfg.GetInstanceList()
471
    if instancelist:
472
      raise errors.OpPrereqError("There are still %d instance(s) in"
473
                                 " this cluster." % len(instancelist))
474

    
475
  def Exec(self, feedback_fn):
476
    """Destroys the cluster.
477

478
    """
479
    master = self.sstore.GetMasterNode()
480
    if not rpc.call_node_stop_master(master, False):
481
      raise errors.OpExecError("Could not disable the master role")
482
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
483
    utils.CreateBackup(priv_key)
484
    utils.CreateBackup(pub_key)
485
    return master
486

    
487

    
488
class LUVerifyCluster(LogicalUnit):
489
  """Verifies the cluster status.
490

491
  """
492
  HPATH = "cluster-verify"
493
  HTYPE = constants.HTYPE_CLUSTER
494
  _OP_REQP = ["skip_checks"]
495

    
496
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
497
                  remote_version, feedback_fn):
498
    """Run multiple tests against a node.
499

500
    Test list:
501
      - compares ganeti version
502
      - checks vg existance and size > 20G
503
      - checks config file checksum
504
      - checks ssh to other nodes
505

506
    Args:
507
      node: name of the node to check
508
      file_list: required list of files
509
      local_cksum: dictionary of local files and their checksums
510

511
    """
512
    # compares ganeti version
513
    local_version = constants.PROTOCOL_VERSION
514
    if not remote_version:
515
      feedback_fn("  - ERROR: connection to %s failed" % (node))
516
      return True
517

    
518
    if local_version != remote_version:
519
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
520
                      (local_version, node, remote_version))
521
      return True
522

    
523
    # checks vg existance and size > 20G
524

    
525
    bad = False
526
    if not vglist:
527
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
528
                      (node,))
529
      bad = True
530
    else:
531
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
532
                                            constants.MIN_VG_SIZE)
533
      if vgstatus:
534
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
535
        bad = True
536

    
537
    # checks config file checksum
538
    # checks ssh to any
539

    
540
    if 'filelist' not in node_result:
541
      bad = True
542
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
543
    else:
544
      remote_cksum = node_result['filelist']
545
      for file_name in file_list:
546
        if file_name not in remote_cksum:
547
          bad = True
548
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
549
        elif remote_cksum[file_name] != local_cksum[file_name]:
550
          bad = True
551
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
552

    
553
    if 'nodelist' not in node_result:
554
      bad = True
555
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
556
    else:
557
      if node_result['nodelist']:
558
        bad = True
559
        for node in node_result['nodelist']:
560
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
561
                          (node, node_result['nodelist'][node]))
562
    if 'node-net-test' not in node_result:
563
      bad = True
564
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
565
    else:
566
      if node_result['node-net-test']:
567
        bad = True
568
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
569
        for node in nlist:
570
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
571
                          (node, node_result['node-net-test'][node]))
572

    
573
    hyp_result = node_result.get('hypervisor', None)
574
    if hyp_result is not None:
575
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
576
    return bad
577

    
578
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
579
                      node_instance, feedback_fn):
580
    """Verify an instance.
581

582
    This function checks to see if the required block devices are
583
    available on the instance's node.
584

585
    """
586
    bad = False
587

    
588
    node_current = instanceconfig.primary_node
589

    
590
    node_vol_should = {}
591
    instanceconfig.MapLVsByNode(node_vol_should)
592

    
593
    for node in node_vol_should:
594
      for volume in node_vol_should[node]:
595
        if node not in node_vol_is or volume not in node_vol_is[node]:
596
          feedback_fn("  - ERROR: volume %s missing on node %s" %
597
                          (volume, node))
598
          bad = True
599

    
600
    if not instanceconfig.status == 'down':
601
      if (node_current not in node_instance or
602
          not instance in node_instance[node_current]):
603
        feedback_fn("  - ERROR: instance %s not running on node %s" %
604
                        (instance, node_current))
605
        bad = True
606

    
607
    for node in node_instance:
608
      if (not node == node_current):
609
        if instance in node_instance[node]:
610
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
611
                          (instance, node))
612
          bad = True
613

    
614
    return bad
615

    
616
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
617
    """Verify if there are any unknown volumes in the cluster.
618

619
    The .os, .swap and backup volumes are ignored. All other volumes are
620
    reported as unknown.
621

622
    """
623
    bad = False
624

    
625
    for node in node_vol_is:
626
      for volume in node_vol_is[node]:
627
        if node not in node_vol_should or volume not in node_vol_should[node]:
628
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
629
                      (volume, node))
630
          bad = True
631
    return bad
632

    
633
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
634
    """Verify the list of running instances.
635

636
    This checks what instances are running but unknown to the cluster.
637

638
    """
639
    bad = False
640
    for node in node_instance:
641
      for runninginstance in node_instance[node]:
642
        if runninginstance not in instancelist:
643
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
644
                          (runninginstance, node))
645
          bad = True
646
    return bad
647

    
648
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
649
    """Verify N+1 Memory Resilience.
650

651
    Check that if one single node dies we can still start all the instances it
652
    was primary for.
653

654
    """
655
    bad = False
656

    
657
    for node, nodeinfo in node_info.iteritems():
658
      # This code checks that every node which is now listed as secondary has
659
      # enough memory to host all instances it is supposed to should a single
660
      # other node in the cluster fail.
661
      # FIXME: not ready for failover to an arbitrary node
662
      # FIXME: does not support file-backed instances
663
      # WARNING: we currently take into account down instances as well as up
664
      # ones, considering that even if they're down someone might want to start
665
      # them even in the event of a node failure.
666
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
667
        needed_mem = 0
668
        for instance in instances:
669
          needed_mem += instance_cfg[instance].memory
670
        if nodeinfo['mfree'] < needed_mem:
671
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
672
                      " failovers should node %s fail" % (node, prinode))
673
          bad = True
674
    return bad
675

    
676
  def CheckPrereq(self):
677
    """Check prerequisites.
678

679
    Transform the list of checks we're going to skip into a set and check that
680
    all its members are valid.
681

682
    """
683
    self.skip_set = frozenset(self.op.skip_checks)
684
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
685
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
686

    
687
  def BuildHooksEnv(self):
688
    """Build hooks env.
689

690
    Cluster-Verify hooks just rone in the post phase and their failure makes
691
    the output be logged in the verify output and the verification to fail.
692

693
    """
694
    all_nodes = self.cfg.GetNodeList()
695
    # TODO: populate the environment with useful information for verify hooks
696
    env = {}
697
    return env, [], all_nodes
698

    
699
  def Exec(self, feedback_fn):
700
    """Verify integrity of cluster, performing various test on nodes.
701

702
    """
703
    bad = False
704
    feedback_fn("* Verifying global settings")
705
    for msg in self.cfg.VerifyConfig():
706
      feedback_fn("  - ERROR: %s" % msg)
707

    
708
    vg_name = self.cfg.GetVGName()
709
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
710
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
711
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
712
    i_non_redundant = [] # Non redundant instances
713
    node_volume = {}
714
    node_instance = {}
715
    node_info = {}
716
    instance_cfg = {}
717

    
718
    # FIXME: verify OS list
719
    # do local checksums
720
    file_names = list(self.sstore.GetFileList())
721
    file_names.append(constants.SSL_CERT_FILE)
722
    file_names.append(constants.CLUSTER_CONF_FILE)
723
    local_checksums = utils.FingerprintFiles(file_names)
724

    
725
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
726
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
727
    all_instanceinfo = rpc.call_instance_list(nodelist)
728
    all_vglist = rpc.call_vg_list(nodelist)
729
    node_verify_param = {
730
      'filelist': file_names,
731
      'nodelist': nodelist,
732
      'hypervisor': None,
733
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
734
                        for node in nodeinfo]
735
      }
736
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
737
    all_rversion = rpc.call_version(nodelist)
738
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
739

    
740
    for node in nodelist:
741
      feedback_fn("* Verifying node %s" % node)
742
      result = self._VerifyNode(node, file_names, local_checksums,
743
                                all_vglist[node], all_nvinfo[node],
744
                                all_rversion[node], feedback_fn)
745
      bad = bad or result
746

    
747
      # node_volume
748
      volumeinfo = all_volumeinfo[node]
749

    
750
      if isinstance(volumeinfo, basestring):
751
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
752
                    (node, volumeinfo[-400:].encode('string_escape')))
753
        bad = True
754
        node_volume[node] = {}
755
      elif not isinstance(volumeinfo, dict):
756
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
757
        bad = True
758
        continue
759
      else:
760
        node_volume[node] = volumeinfo
761

    
762
      # node_instance
763
      nodeinstance = all_instanceinfo[node]
764
      if type(nodeinstance) != list:
765
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
766
        bad = True
767
        continue
768

    
769
      node_instance[node] = nodeinstance
770

    
771
      # node_info
772
      nodeinfo = all_ninfo[node]
773
      if not isinstance(nodeinfo, dict):
774
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
775
        bad = True
776
        continue
777

    
778
      try:
779
        node_info[node] = {
780
          "mfree": int(nodeinfo['memory_free']),
781
          "dfree": int(nodeinfo['vg_free']),
782
          "pinst": [],
783
          "sinst": [],
784
          # dictionary holding all instances this node is secondary for,
785
          # grouped by their primary node. Each key is a cluster node, and each
786
          # value is a list of instances which have the key as primary and the
787
          # current node as secondary.  this is handy to calculate N+1 memory
788
          # availability if you can only failover from a primary to its
789
          # secondary.
790
          "sinst-by-pnode": {},
791
        }
792
      except ValueError:
793
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
794
        bad = True
795
        continue
796

    
797
    node_vol_should = {}
798

    
799
    for instance in instancelist:
800
      feedback_fn("* Verifying instance %s" % instance)
801
      inst_config = self.cfg.GetInstanceInfo(instance)
802
      result =  self._VerifyInstance(instance, inst_config, node_volume,
803
                                     node_instance, feedback_fn)
804
      bad = bad or result
805

    
806
      inst_config.MapLVsByNode(node_vol_should)
807

    
808
      instance_cfg[instance] = inst_config
809

    
810
      pnode = inst_config.primary_node
811
      if pnode in node_info:
812
        node_info[pnode]['pinst'].append(instance)
813
      else:
814
        feedback_fn("  - ERROR: instance %s, connection to primary node"
815
                    " %s failed" % (instance, pnode))
816
        bad = True
817

    
818
      # If the instance is non-redundant we cannot survive losing its primary
819
      # node, so we are not N+1 compliant. On the other hand we have no disk
820
      # templates with more than one secondary so that situation is not well
821
      # supported either.
822
      # FIXME: does not support file-backed instances
823
      if len(inst_config.secondary_nodes) == 0:
824
        i_non_redundant.append(instance)
825
      elif len(inst_config.secondary_nodes) > 1:
826
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
827
                    % instance)
828

    
829
      for snode in inst_config.secondary_nodes:
830
        if snode in node_info:
831
          node_info[snode]['sinst'].append(instance)
832
          if pnode not in node_info[snode]['sinst-by-pnode']:
833
            node_info[snode]['sinst-by-pnode'][pnode] = []
834
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
835
        else:
836
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
837
                      " %s failed" % (instance, snode))
838

    
839
    feedback_fn("* Verifying orphan volumes")
840
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
841
                                       feedback_fn)
842
    bad = bad or result
843

    
844
    feedback_fn("* Verifying remaining instances")
845
    result = self._VerifyOrphanInstances(instancelist, node_instance,
846
                                         feedback_fn)
847
    bad = bad or result
848

    
849
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
850
      feedback_fn("* Verifying N+1 Memory redundancy")
851
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
852
      bad = bad or result
853

    
854
    feedback_fn("* Other Notes")
855
    if i_non_redundant:
856
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
857
                  % len(i_non_redundant))
858

    
859
    return not bad
860

    
861
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
862
    """Analize the post-hooks' result, handle it, and send some
863
    nicely-formatted feedback back to the user.
864

865
    Args:
866
      phase: the hooks phase that has just been run
867
      hooks_results: the results of the multi-node hooks rpc call
868
      feedback_fn: function to send feedback back to the caller
869
      lu_result: previous Exec result
870

871
    """
872
    # We only really run POST phase hooks, and are only interested in
873
    # their results
874
    if phase == constants.HOOKS_PHASE_POST:
875
      # Used to change hooks' output to proper indentation
876
      indent_re = re.compile('^', re.M)
877
      feedback_fn("* Hooks Results")
878
      if not hooks_results:
879
        feedback_fn("  - ERROR: general communication failure")
880
        lu_result = 1
881
      else:
882
        for node_name in hooks_results:
883
          show_node_header = True
884
          res = hooks_results[node_name]
885
          if res is False or not isinstance(res, list):
886
            feedback_fn("    Communication failure")
887
            lu_result = 1
888
            continue
889
          for script, hkr, output in res:
890
            if hkr == constants.HKR_FAIL:
891
              # The node header is only shown once, if there are
892
              # failing hooks on that node
893
              if show_node_header:
894
                feedback_fn("  Node %s:" % node_name)
895
                show_node_header = False
896
              feedback_fn("    ERROR: Script %s failed, output:" % script)
897
              output = indent_re.sub('      ', output)
898
              feedback_fn("%s" % output)
899
              lu_result = 1
900

    
901
      return lu_result
902

    
903

    
904
class LUVerifyDisks(NoHooksLU):
905
  """Verifies the cluster disks status.
906

907
  """
908
  _OP_REQP = []
909

    
910
  def CheckPrereq(self):
911
    """Check prerequisites.
912

913
    This has no prerequisites.
914

915
    """
916
    pass
917

    
918
  def Exec(self, feedback_fn):
919
    """Verify integrity of cluster disks.
920

921
    """
922
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
923

    
924
    vg_name = self.cfg.GetVGName()
925
    nodes = utils.NiceSort(self.cfg.GetNodeList())
926
    instances = [self.cfg.GetInstanceInfo(name)
927
                 for name in self.cfg.GetInstanceList()]
928

    
929
    nv_dict = {}
930
    for inst in instances:
931
      inst_lvs = {}
932
      if (inst.status != "up" or
933
          inst.disk_template not in constants.DTS_NET_MIRROR):
934
        continue
935
      inst.MapLVsByNode(inst_lvs)
936
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
937
      for node, vol_list in inst_lvs.iteritems():
938
        for vol in vol_list:
939
          nv_dict[(node, vol)] = inst
940

    
941
    if not nv_dict:
942
      return result
943

    
944
    node_lvs = rpc.call_volume_list(nodes, vg_name)
945

    
946
    to_act = set()
947
    for node in nodes:
948
      # node_volume
949
      lvs = node_lvs[node]
950

    
951
      if isinstance(lvs, basestring):
952
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
953
        res_nlvm[node] = lvs
954
      elif not isinstance(lvs, dict):
955
        logger.Info("connection to node %s failed or invalid data returned" %
956
                    (node,))
957
        res_nodes.append(node)
958
        continue
959

    
960
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
961
        inst = nv_dict.pop((node, lv_name), None)
962
        if (not lv_online and inst is not None
963
            and inst.name not in res_instances):
964
          res_instances.append(inst.name)
965

    
966
    # any leftover items in nv_dict are missing LVs, let's arrange the
967
    # data better
968
    for key, inst in nv_dict.iteritems():
969
      if inst.name not in res_missing:
970
        res_missing[inst.name] = []
971
      res_missing[inst.name].append(key)
972

    
973
    return result
974

    
975

    
976
class LURenameCluster(LogicalUnit):
977
  """Rename the cluster.
978

979
  """
980
  HPATH = "cluster-rename"
981
  HTYPE = constants.HTYPE_CLUSTER
982
  _OP_REQP = ["name"]
983
  REQ_WSSTORE = True
984

    
985
  def BuildHooksEnv(self):
986
    """Build hooks env.
987

988
    """
989
    env = {
990
      "OP_TARGET": self.sstore.GetClusterName(),
991
      "NEW_NAME": self.op.name,
992
      }
993
    mn = self.sstore.GetMasterNode()
994
    return env, [mn], [mn]
995

    
996
  def CheckPrereq(self):
997
    """Verify that the passed name is a valid one.
998

999
    """
1000
    hostname = utils.HostInfo(self.op.name)
1001

    
1002
    new_name = hostname.name
1003
    self.ip = new_ip = hostname.ip
1004
    old_name = self.sstore.GetClusterName()
1005
    old_ip = self.sstore.GetMasterIP()
1006
    if new_name == old_name and new_ip == old_ip:
1007
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1008
                                 " cluster has changed")
1009
    if new_ip != old_ip:
1010
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1011
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1012
                                   " reachable on the network. Aborting." %
1013
                                   new_ip)
1014

    
1015
    self.op.name = new_name
1016

    
1017
  def Exec(self, feedback_fn):
1018
    """Rename the cluster.
1019

1020
    """
1021
    clustername = self.op.name
1022
    ip = self.ip
1023
    ss = self.sstore
1024

    
1025
    # shutdown the master IP
1026
    master = ss.GetMasterNode()
1027
    if not rpc.call_node_stop_master(master, False):
1028
      raise errors.OpExecError("Could not disable the master role")
1029

    
1030
    try:
1031
      # modify the sstore
1032
      ss.SetKey(ss.SS_MASTER_IP, ip)
1033
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1034

    
1035
      # Distribute updated ss config to all nodes
1036
      myself = self.cfg.GetNodeInfo(master)
1037
      dist_nodes = self.cfg.GetNodeList()
1038
      if myself.name in dist_nodes:
1039
        dist_nodes.remove(myself.name)
1040

    
1041
      logger.Debug("Copying updated ssconf data to all nodes")
1042
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1043
        fname = ss.KeyToFilename(keyname)
1044
        result = rpc.call_upload_file(dist_nodes, fname)
1045
        for to_node in dist_nodes:
1046
          if not result[to_node]:
1047
            logger.Error("copy of file %s to node %s failed" %
1048
                         (fname, to_node))
1049
    finally:
1050
      if not rpc.call_node_start_master(master, False):
1051
        logger.Error("Could not re-enable the master role on the master,"
1052
                     " please restart manually.")
1053

    
1054

    
1055
def _RecursiveCheckIfLVMBased(disk):
1056
  """Check if the given disk or its children are lvm-based.
1057

1058
  Args:
1059
    disk: ganeti.objects.Disk object
1060

1061
  Returns:
1062
    boolean indicating whether a LD_LV dev_type was found or not
1063

1064
  """
1065
  if disk.children:
1066
    for chdisk in disk.children:
1067
      if _RecursiveCheckIfLVMBased(chdisk):
1068
        return True
1069
  return disk.dev_type == constants.LD_LV
1070

    
1071

    
1072
class LUSetClusterParams(LogicalUnit):
1073
  """Change the parameters of the cluster.
1074

1075
  """
1076
  HPATH = "cluster-modify"
1077
  HTYPE = constants.HTYPE_CLUSTER
1078
  _OP_REQP = []
1079

    
1080
  def BuildHooksEnv(self):
1081
    """Build hooks env.
1082

1083
    """
1084
    env = {
1085
      "OP_TARGET": self.sstore.GetClusterName(),
1086
      "NEW_VG_NAME": self.op.vg_name,
1087
      }
1088
    mn = self.sstore.GetMasterNode()
1089
    return env, [mn], [mn]
1090

    
1091
  def CheckPrereq(self):
1092
    """Check prerequisites.
1093

1094
    This checks whether the given params don't conflict and
1095
    if the given volume group is valid.
1096

1097
    """
1098
    if not self.op.vg_name:
1099
      instances = [self.cfg.GetInstanceInfo(name)
1100
                   for name in self.cfg.GetInstanceList()]
1101
      for inst in instances:
1102
        for disk in inst.disks:
1103
          if _RecursiveCheckIfLVMBased(disk):
1104
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1105
                                       " lvm-based instances exist")
1106

    
1107
    # if vg_name not None, checks given volume group on all nodes
1108
    if self.op.vg_name:
1109
      node_list = self.cfg.GetNodeList()
1110
      vglist = rpc.call_vg_list(node_list)
1111
      for node in node_list:
1112
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1113
                                              constants.MIN_VG_SIZE)
1114
        if vgstatus:
1115
          raise errors.OpPrereqError("Error on node '%s': %s" %
1116
                                     (node, vgstatus))
1117

    
1118
  def Exec(self, feedback_fn):
1119
    """Change the parameters of the cluster.
1120

1121
    """
1122
    if self.op.vg_name != self.cfg.GetVGName():
1123
      self.cfg.SetVGName(self.op.vg_name)
1124
    else:
1125
      feedback_fn("Cluster LVM configuration already in desired"
1126
                  " state, not changing")
1127

    
1128

    
1129
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1130
  """Sleep and poll for an instance's disk to sync.
1131

1132
  """
1133
  if not instance.disks:
1134
    return True
1135

    
1136
  if not oneshot:
1137
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1138

    
1139
  node = instance.primary_node
1140

    
1141
  for dev in instance.disks:
1142
    cfgw.SetDiskID(dev, node)
1143

    
1144
  retries = 0
1145
  while True:
1146
    max_time = 0
1147
    done = True
1148
    cumul_degraded = False
1149
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1150
    if not rstats:
1151
      proc.LogWarning("Can't get any data from node %s" % node)
1152
      retries += 1
1153
      if retries >= 10:
1154
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1155
                                 " aborting." % node)
1156
      time.sleep(6)
1157
      continue
1158
    retries = 0
1159
    for i in range(len(rstats)):
1160
      mstat = rstats[i]
1161
      if mstat is None:
1162
        proc.LogWarning("Can't compute data for node %s/%s" %
1163
                        (node, instance.disks[i].iv_name))
1164
        continue
1165
      # we ignore the ldisk parameter
1166
      perc_done, est_time, is_degraded, _ = mstat
1167
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1168
      if perc_done is not None:
1169
        done = False
1170
        if est_time is not None:
1171
          rem_time = "%d estimated seconds remaining" % est_time
1172
          max_time = est_time
1173
        else:
1174
          rem_time = "no time estimate"
1175
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1176
                     (instance.disks[i].iv_name, perc_done, rem_time))
1177
    if done or oneshot:
1178
      break
1179

    
1180
    time.sleep(min(60, max_time))
1181

    
1182
  if done:
1183
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1184
  return not cumul_degraded
1185

    
1186

    
1187
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1188
  """Check that mirrors are not degraded.
1189

1190
  The ldisk parameter, if True, will change the test from the
1191
  is_degraded attribute (which represents overall non-ok status for
1192
  the device(s)) to the ldisk (representing the local storage status).
1193

1194
  """
1195
  cfgw.SetDiskID(dev, node)
1196
  if ldisk:
1197
    idx = 6
1198
  else:
1199
    idx = 5
1200

    
1201
  result = True
1202
  if on_primary or dev.AssembleOnSecondary():
1203
    rstats = rpc.call_blockdev_find(node, dev)
1204
    if not rstats:
1205
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1206
      result = False
1207
    else:
1208
      result = result and (not rstats[idx])
1209
  if dev.children:
1210
    for child in dev.children:
1211
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1212

    
1213
  return result
1214

    
1215

    
1216
class LUDiagnoseOS(NoHooksLU):
1217
  """Logical unit for OS diagnose/query.
1218

1219
  """
1220
  _OP_REQP = ["output_fields", "names"]
1221

    
1222
  def CheckPrereq(self):
1223
    """Check prerequisites.
1224

1225
    This always succeeds, since this is a pure query LU.
1226

1227
    """
1228
    if self.op.names:
1229
      raise errors.OpPrereqError("Selective OS query not supported")
1230

    
1231
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1232
    _CheckOutputFields(static=[],
1233
                       dynamic=self.dynamic_fields,
1234
                       selected=self.op.output_fields)
1235

    
1236
  @staticmethod
1237
  def _DiagnoseByOS(node_list, rlist):
1238
    """Remaps a per-node return list into an a per-os per-node dictionary
1239

1240
      Args:
1241
        node_list: a list with the names of all nodes
1242
        rlist: a map with node names as keys and OS objects as values
1243

1244
      Returns:
1245
        map: a map with osnames as keys and as value another map, with
1246
             nodes as
1247
             keys and list of OS objects as values
1248
             e.g. {"debian-etch": {"node1": [<object>,...],
1249
                                   "node2": [<object>,]}
1250
                  }
1251

1252
    """
1253
    all_os = {}
1254
    for node_name, nr in rlist.iteritems():
1255
      if not nr:
1256
        continue
1257
      for os_obj in nr:
1258
        if os_obj.name not in all_os:
1259
          # build a list of nodes for this os containing empty lists
1260
          # for each node in node_list
1261
          all_os[os_obj.name] = {}
1262
          for nname in node_list:
1263
            all_os[os_obj.name][nname] = []
1264
        all_os[os_obj.name][node_name].append(os_obj)
1265
    return all_os
1266

    
1267
  def Exec(self, feedback_fn):
1268
    """Compute the list of OSes.
1269

1270
    """
1271
    node_list = self.cfg.GetNodeList()
1272
    node_data = rpc.call_os_diagnose(node_list)
1273
    if node_data == False:
1274
      raise errors.OpExecError("Can't gather the list of OSes")
1275
    pol = self._DiagnoseByOS(node_list, node_data)
1276
    output = []
1277
    for os_name, os_data in pol.iteritems():
1278
      row = []
1279
      for field in self.op.output_fields:
1280
        if field == "name":
1281
          val = os_name
1282
        elif field == "valid":
1283
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1284
        elif field == "node_status":
1285
          val = {}
1286
          for node_name, nos_list in os_data.iteritems():
1287
            val[node_name] = [(v.status, v.path) for v in nos_list]
1288
        else:
1289
          raise errors.ParameterError(field)
1290
        row.append(val)
1291
      output.append(row)
1292

    
1293
    return output
1294

    
1295

    
1296
class LURemoveNode(LogicalUnit):
1297
  """Logical unit for removing a node.
1298

1299
  """
1300
  HPATH = "node-remove"
1301
  HTYPE = constants.HTYPE_NODE
1302
  _OP_REQP = ["node_name"]
1303

    
1304
  def BuildHooksEnv(self):
1305
    """Build hooks env.
1306

1307
    This doesn't run on the target node in the pre phase as a failed
1308
    node would then be impossible to remove.
1309

1310
    """
1311
    env = {
1312
      "OP_TARGET": self.op.node_name,
1313
      "NODE_NAME": self.op.node_name,
1314
      }
1315
    all_nodes = self.cfg.GetNodeList()
1316
    all_nodes.remove(self.op.node_name)
1317
    return env, all_nodes, all_nodes
1318

    
1319
  def CheckPrereq(self):
1320
    """Check prerequisites.
1321

1322
    This checks:
1323
     - the node exists in the configuration
1324
     - it does not have primary or secondary instances
1325
     - it's not the master
1326

1327
    Any errors are signalled by raising errors.OpPrereqError.
1328

1329
    """
1330
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1331
    if node is None:
1332
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1333

    
1334
    instance_list = self.cfg.GetInstanceList()
1335

    
1336
    masternode = self.sstore.GetMasterNode()
1337
    if node.name == masternode:
1338
      raise errors.OpPrereqError("Node is the master node,"
1339
                                 " you need to failover first.")
1340

    
1341
    for instance_name in instance_list:
1342
      instance = self.cfg.GetInstanceInfo(instance_name)
1343
      if node.name == instance.primary_node:
1344
        raise errors.OpPrereqError("Instance %s still running on the node,"
1345
                                   " please remove first." % instance_name)
1346
      if node.name in instance.secondary_nodes:
1347
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1348
                                   " please remove first." % instance_name)
1349
    self.op.node_name = node.name
1350
    self.node = node
1351

    
1352
  def Exec(self, feedback_fn):
1353
    """Removes the node from the cluster.
1354

1355
    """
1356
    node = self.node
1357
    logger.Info("stopping the node daemon and removing configs from node %s" %
1358
                node.name)
1359

    
1360
    self.context.RemoveNode(node.name)
1361

    
1362
    rpc.call_node_leave_cluster(node.name)
1363

    
1364

    
1365
class LUQueryNodes(NoHooksLU):
1366
  """Logical unit for querying nodes.
1367

1368
  """
1369
  _OP_REQP = ["output_fields", "names"]
1370
  REQ_BGL = False
1371

    
1372
  def ExpandNames(self):
1373
    self.dynamic_fields = frozenset([
1374
      "dtotal", "dfree",
1375
      "mtotal", "mnode", "mfree",
1376
      "bootid",
1377
      "ctotal",
1378
      ])
1379

    
1380
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1381
                               "pinst_list", "sinst_list",
1382
                               "pip", "sip", "tags"],
1383
                       dynamic=self.dynamic_fields,
1384
                       selected=self.op.output_fields)
1385

    
1386
    self.needed_locks = {}
1387
    self.share_locks[locking.LEVEL_NODE] = 1
1388
    # TODO: we could lock nodes only if the user asked for dynamic fields. For
1389
    # that we need atomic ways to get info for a group of nodes from the
1390
    # config, though.
1391
    if not self.op.names:
1392
      self.needed_locks[locking.LEVEL_NODE] = None
1393
    else:
1394
       self.needed_locks[locking.LEVEL_NODE] = \
1395
         _GetWantedNodes(self, self.op.names)
1396

    
1397
  def CheckPrereq(self):
1398
    """Check prerequisites.
1399

1400
    """
1401
    # This of course is valid only if we locked the nodes
1402
    self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1403

    
1404
  def Exec(self, feedback_fn):
1405
    """Computes the list of nodes and their attributes.
1406

1407
    """
1408
    nodenames = self.wanted
1409
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1410

    
1411
    # begin data gathering
1412

    
1413
    if self.dynamic_fields.intersection(self.op.output_fields):
1414
      live_data = {}
1415
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1416
      for name in nodenames:
1417
        nodeinfo = node_data.get(name, None)
1418
        if nodeinfo:
1419
          live_data[name] = {
1420
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1421
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1422
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1423
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1424
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1425
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1426
            "bootid": nodeinfo['bootid'],
1427
            }
1428
        else:
1429
          live_data[name] = {}
1430
    else:
1431
      live_data = dict.fromkeys(nodenames, {})
1432

    
1433
    node_to_primary = dict([(name, set()) for name in nodenames])
1434
    node_to_secondary = dict([(name, set()) for name in nodenames])
1435

    
1436
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1437
                             "sinst_cnt", "sinst_list"))
1438
    if inst_fields & frozenset(self.op.output_fields):
1439
      instancelist = self.cfg.GetInstanceList()
1440

    
1441
      for instance_name in instancelist:
1442
        inst = self.cfg.GetInstanceInfo(instance_name)
1443
        if inst.primary_node in node_to_primary:
1444
          node_to_primary[inst.primary_node].add(inst.name)
1445
        for secnode in inst.secondary_nodes:
1446
          if secnode in node_to_secondary:
1447
            node_to_secondary[secnode].add(inst.name)
1448

    
1449
    # end data gathering
1450

    
1451
    output = []
1452
    for node in nodelist:
1453
      node_output = []
1454
      for field in self.op.output_fields:
1455
        if field == "name":
1456
          val = node.name
1457
        elif field == "pinst_list":
1458
          val = list(node_to_primary[node.name])
1459
        elif field == "sinst_list":
1460
          val = list(node_to_secondary[node.name])
1461
        elif field == "pinst_cnt":
1462
          val = len(node_to_primary[node.name])
1463
        elif field == "sinst_cnt":
1464
          val = len(node_to_secondary[node.name])
1465
        elif field == "pip":
1466
          val = node.primary_ip
1467
        elif field == "sip":
1468
          val = node.secondary_ip
1469
        elif field == "tags":
1470
          val = list(node.GetTags())
1471
        elif field in self.dynamic_fields:
1472
          val = live_data[node.name].get(field, None)
1473
        else:
1474
          raise errors.ParameterError(field)
1475
        node_output.append(val)
1476
      output.append(node_output)
1477

    
1478
    return output
1479

    
1480

    
1481
class LUQueryNodeVolumes(NoHooksLU):
1482
  """Logical unit for getting volumes on node(s).
1483

1484
  """
1485
  _OP_REQP = ["nodes", "output_fields"]
1486

    
1487
  def CheckPrereq(self):
1488
    """Check prerequisites.
1489

1490
    This checks that the fields required are valid output fields.
1491

1492
    """
1493
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1494

    
1495
    _CheckOutputFields(static=["node"],
1496
                       dynamic=["phys", "vg", "name", "size", "instance"],
1497
                       selected=self.op.output_fields)
1498

    
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Computes the list of nodes and their attributes.
1502

1503
    """
1504
    nodenames = self.nodes
1505
    volumes = rpc.call_node_volumes(nodenames)
1506

    
1507
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1508
             in self.cfg.GetInstanceList()]
1509

    
1510
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1511

    
1512
    output = []
1513
    for node in nodenames:
1514
      if node not in volumes or not volumes[node]:
1515
        continue
1516

    
1517
      node_vols = volumes[node][:]
1518
      node_vols.sort(key=lambda vol: vol['dev'])
1519

    
1520
      for vol in node_vols:
1521
        node_output = []
1522
        for field in self.op.output_fields:
1523
          if field == "node":
1524
            val = node
1525
          elif field == "phys":
1526
            val = vol['dev']
1527
          elif field == "vg":
1528
            val = vol['vg']
1529
          elif field == "name":
1530
            val = vol['name']
1531
          elif field == "size":
1532
            val = int(float(vol['size']))
1533
          elif field == "instance":
1534
            for inst in ilist:
1535
              if node not in lv_by_node[inst]:
1536
                continue
1537
              if vol['name'] in lv_by_node[inst][node]:
1538
                val = inst.name
1539
                break
1540
            else:
1541
              val = '-'
1542
          else:
1543
            raise errors.ParameterError(field)
1544
          node_output.append(str(val))
1545

    
1546
        output.append(node_output)
1547

    
1548
    return output
1549

    
1550

    
1551
class LUAddNode(LogicalUnit):
1552
  """Logical unit for adding node to the cluster.
1553

1554
  """
1555
  HPATH = "node-add"
1556
  HTYPE = constants.HTYPE_NODE
1557
  _OP_REQP = ["node_name"]
1558

    
1559
  def BuildHooksEnv(self):
1560
    """Build hooks env.
1561

1562
    This will run on all nodes before, and on all nodes + the new node after.
1563

1564
    """
1565
    env = {
1566
      "OP_TARGET": self.op.node_name,
1567
      "NODE_NAME": self.op.node_name,
1568
      "NODE_PIP": self.op.primary_ip,
1569
      "NODE_SIP": self.op.secondary_ip,
1570
      }
1571
    nodes_0 = self.cfg.GetNodeList()
1572
    nodes_1 = nodes_0 + [self.op.node_name, ]
1573
    return env, nodes_0, nodes_1
1574

    
1575
  def CheckPrereq(self):
1576
    """Check prerequisites.
1577

1578
    This checks:
1579
     - the new node is not already in the config
1580
     - it is resolvable
1581
     - its parameters (single/dual homed) matches the cluster
1582

1583
    Any errors are signalled by raising errors.OpPrereqError.
1584

1585
    """
1586
    node_name = self.op.node_name
1587
    cfg = self.cfg
1588

    
1589
    dns_data = utils.HostInfo(node_name)
1590

    
1591
    node = dns_data.name
1592
    primary_ip = self.op.primary_ip = dns_data.ip
1593
    secondary_ip = getattr(self.op, "secondary_ip", None)
1594
    if secondary_ip is None:
1595
      secondary_ip = primary_ip
1596
    if not utils.IsValidIP(secondary_ip):
1597
      raise errors.OpPrereqError("Invalid secondary IP given")
1598
    self.op.secondary_ip = secondary_ip
1599

    
1600
    node_list = cfg.GetNodeList()
1601
    if not self.op.readd and node in node_list:
1602
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1603
                                 node)
1604
    elif self.op.readd and node not in node_list:
1605
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1606

    
1607
    for existing_node_name in node_list:
1608
      existing_node = cfg.GetNodeInfo(existing_node_name)
1609

    
1610
      if self.op.readd and node == existing_node_name:
1611
        if (existing_node.primary_ip != primary_ip or
1612
            existing_node.secondary_ip != secondary_ip):
1613
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1614
                                     " address configuration as before")
1615
        continue
1616

    
1617
      if (existing_node.primary_ip == primary_ip or
1618
          existing_node.secondary_ip == primary_ip or
1619
          existing_node.primary_ip == secondary_ip or
1620
          existing_node.secondary_ip == secondary_ip):
1621
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1622
                                   " existing node %s" % existing_node.name)
1623

    
1624
    # check that the type of the node (single versus dual homed) is the
1625
    # same as for the master
1626
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1627
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1628
    newbie_singlehomed = secondary_ip == primary_ip
1629
    if master_singlehomed != newbie_singlehomed:
1630
      if master_singlehomed:
1631
        raise errors.OpPrereqError("The master has no private ip but the"
1632
                                   " new node has one")
1633
      else:
1634
        raise errors.OpPrereqError("The master has a private ip but the"
1635
                                   " new node doesn't have one")
1636

    
1637
    # checks reachablity
1638
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1639
      raise errors.OpPrereqError("Node not reachable by ping")
1640

    
1641
    if not newbie_singlehomed:
1642
      # check reachability from my secondary ip to newbie's secondary ip
1643
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1644
                           source=myself.secondary_ip):
1645
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1646
                                   " based ping to noded port")
1647

    
1648
    self.new_node = objects.Node(name=node,
1649
                                 primary_ip=primary_ip,
1650
                                 secondary_ip=secondary_ip)
1651

    
1652
  def Exec(self, feedback_fn):
1653
    """Adds the new node to the cluster.
1654

1655
    """
1656
    new_node = self.new_node
1657
    node = new_node.name
1658

    
1659
    # check connectivity
1660
    result = rpc.call_version([node])[node]
1661
    if result:
1662
      if constants.PROTOCOL_VERSION == result:
1663
        logger.Info("communication to node %s fine, sw version %s match" %
1664
                    (node, result))
1665
      else:
1666
        raise errors.OpExecError("Version mismatch master version %s,"
1667
                                 " node version %s" %
1668
                                 (constants.PROTOCOL_VERSION, result))
1669
    else:
1670
      raise errors.OpExecError("Cannot get version from the new node")
1671

    
1672
    # setup ssh on node
1673
    logger.Info("copy ssh key to node %s" % node)
1674
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1675
    keyarray = []
1676
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1677
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1678
                priv_key, pub_key]
1679

    
1680
    for i in keyfiles:
1681
      f = open(i, 'r')
1682
      try:
1683
        keyarray.append(f.read())
1684
      finally:
1685
        f.close()
1686

    
1687
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1688
                               keyarray[3], keyarray[4], keyarray[5])
1689

    
1690
    if not result:
1691
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1692

    
1693
    # Add node to our /etc/hosts, and add key to known_hosts
1694
    utils.AddHostToEtcHosts(new_node.name)
1695

    
1696
    if new_node.secondary_ip != new_node.primary_ip:
1697
      if not rpc.call_node_tcp_ping(new_node.name,
1698
                                    constants.LOCALHOST_IP_ADDRESS,
1699
                                    new_node.secondary_ip,
1700
                                    constants.DEFAULT_NODED_PORT,
1701
                                    10, False):
1702
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1703
                                 " you gave (%s). Please fix and re-run this"
1704
                                 " command." % new_node.secondary_ip)
1705

    
1706
    node_verify_list = [self.sstore.GetMasterNode()]
1707
    node_verify_param = {
1708
      'nodelist': [node],
1709
      # TODO: do a node-net-test as well?
1710
    }
1711

    
1712
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1713
    for verifier in node_verify_list:
1714
      if not result[verifier]:
1715
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1716
                                 " for remote verification" % verifier)
1717
      if result[verifier]['nodelist']:
1718
        for failed in result[verifier]['nodelist']:
1719
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1720
                      (verifier, result[verifier]['nodelist'][failed]))
1721
        raise errors.OpExecError("ssh/hostname verification failed.")
1722

    
1723
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1724
    # including the node just added
1725
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1726
    dist_nodes = self.cfg.GetNodeList()
1727
    if not self.op.readd:
1728
      dist_nodes.append(node)
1729
    if myself.name in dist_nodes:
1730
      dist_nodes.remove(myself.name)
1731

    
1732
    logger.Debug("Copying hosts and known_hosts to all nodes")
1733
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1734
      result = rpc.call_upload_file(dist_nodes, fname)
1735
      for to_node in dist_nodes:
1736
        if not result[to_node]:
1737
          logger.Error("copy of file %s to node %s failed" %
1738
                       (fname, to_node))
1739

    
1740
    to_copy = self.sstore.GetFileList()
1741
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1742
      to_copy.append(constants.VNC_PASSWORD_FILE)
1743
    for fname in to_copy:
1744
      result = rpc.call_upload_file([node], fname)
1745
      if not result[node]:
1746
        logger.Error("could not copy file %s to node %s" % (fname, node))
1747

    
1748
    if self.op.readd:
1749
      self.context.ReaddNode(new_node)
1750
    else:
1751
      self.context.AddNode(new_node)
1752

    
1753

    
1754
class LUQueryClusterInfo(NoHooksLU):
1755
  """Query cluster configuration.
1756

1757
  """
1758
  _OP_REQP = []
1759
  REQ_MASTER = False
1760
  REQ_BGL = False
1761

    
1762
  def ExpandNames(self):
1763
    self.needed_locks = {}
1764

    
1765
  def CheckPrereq(self):
1766
    """No prerequsites needed for this LU.
1767

1768
    """
1769
    pass
1770

    
1771
  def Exec(self, feedback_fn):
1772
    """Return cluster config.
1773

1774
    """
1775
    result = {
1776
      "name": self.sstore.GetClusterName(),
1777
      "software_version": constants.RELEASE_VERSION,
1778
      "protocol_version": constants.PROTOCOL_VERSION,
1779
      "config_version": constants.CONFIG_VERSION,
1780
      "os_api_version": constants.OS_API_VERSION,
1781
      "export_version": constants.EXPORT_VERSION,
1782
      "master": self.sstore.GetMasterNode(),
1783
      "architecture": (platform.architecture()[0], platform.machine()),
1784
      "hypervisor_type": self.sstore.GetHypervisorType(),
1785
      }
1786

    
1787
    return result
1788

    
1789

    
1790
class LUDumpClusterConfig(NoHooksLU):
1791
  """Return a text-representation of the cluster-config.
1792

1793
  """
1794
  _OP_REQP = []
1795
  REQ_BGL = False
1796

    
1797
  def ExpandNames(self):
1798
    self.needed_locks = {}
1799

    
1800
  def CheckPrereq(self):
1801
    """No prerequisites.
1802

1803
    """
1804
    pass
1805

    
1806
  def Exec(self, feedback_fn):
1807
    """Dump a representation of the cluster config to the standard output.
1808

1809
    """
1810
    return self.cfg.DumpConfig()
1811

    
1812

    
1813
class LUActivateInstanceDisks(NoHooksLU):
1814
  """Bring up an instance's disks.
1815

1816
  """
1817
  _OP_REQP = ["instance_name"]
1818

    
1819
  def CheckPrereq(self):
1820
    """Check prerequisites.
1821

1822
    This checks that the instance is in the cluster.
1823

1824
    """
1825
    instance = self.cfg.GetInstanceInfo(
1826
      self.cfg.ExpandInstanceName(self.op.instance_name))
1827
    if instance is None:
1828
      raise errors.OpPrereqError("Instance '%s' not known" %
1829
                                 self.op.instance_name)
1830
    self.instance = instance
1831

    
1832

    
1833
  def Exec(self, feedback_fn):
1834
    """Activate the disks.
1835

1836
    """
1837
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1838
    if not disks_ok:
1839
      raise errors.OpExecError("Cannot activate block devices")
1840

    
1841
    return disks_info
1842

    
1843

    
1844
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1845
  """Prepare the block devices for an instance.
1846

1847
  This sets up the block devices on all nodes.
1848

1849
  Args:
1850
    instance: a ganeti.objects.Instance object
1851
    ignore_secondaries: if true, errors on secondary nodes won't result
1852
                        in an error return from the function
1853

1854
  Returns:
1855
    false if the operation failed
1856
    list of (host, instance_visible_name, node_visible_name) if the operation
1857
         suceeded with the mapping from node devices to instance devices
1858
  """
1859
  device_info = []
1860
  disks_ok = True
1861
  iname = instance.name
1862
  # With the two passes mechanism we try to reduce the window of
1863
  # opportunity for the race condition of switching DRBD to primary
1864
  # before handshaking occured, but we do not eliminate it
1865

    
1866
  # The proper fix would be to wait (with some limits) until the
1867
  # connection has been made and drbd transitions from WFConnection
1868
  # into any other network-connected state (Connected, SyncTarget,
1869
  # SyncSource, etc.)
1870

    
1871
  # 1st pass, assemble on all nodes in secondary mode
1872
  for inst_disk in instance.disks:
1873
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1874
      cfg.SetDiskID(node_disk, node)
1875
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1876
      if not result:
1877
        logger.Error("could not prepare block device %s on node %s"
1878
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1879
        if not ignore_secondaries:
1880
          disks_ok = False
1881

    
1882
  # FIXME: race condition on drbd migration to primary
1883

    
1884
  # 2nd pass, do only the primary node
1885
  for inst_disk in instance.disks:
1886
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1887
      if node != instance.primary_node:
1888
        continue
1889
      cfg.SetDiskID(node_disk, node)
1890
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1891
      if not result:
1892
        logger.Error("could not prepare block device %s on node %s"
1893
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1894
        disks_ok = False
1895
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1896

    
1897
  # leave the disks configured for the primary node
1898
  # this is a workaround that would be fixed better by
1899
  # improving the logical/physical id handling
1900
  for disk in instance.disks:
1901
    cfg.SetDiskID(disk, instance.primary_node)
1902

    
1903
  return disks_ok, device_info
1904

    
1905

    
1906
def _StartInstanceDisks(cfg, instance, force):
1907
  """Start the disks of an instance.
1908

1909
  """
1910
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1911
                                           ignore_secondaries=force)
1912
  if not disks_ok:
1913
    _ShutdownInstanceDisks(instance, cfg)
1914
    if force is not None and not force:
1915
      logger.Error("If the message above refers to a secondary node,"
1916
                   " you can retry the operation using '--force'.")
1917
    raise errors.OpExecError("Disk consistency error")
1918

    
1919

    
1920
class LUDeactivateInstanceDisks(NoHooksLU):
1921
  """Shutdown an instance's disks.
1922

1923
  """
1924
  _OP_REQP = ["instance_name"]
1925

    
1926
  def CheckPrereq(self):
1927
    """Check prerequisites.
1928

1929
    This checks that the instance is in the cluster.
1930

1931
    """
1932
    instance = self.cfg.GetInstanceInfo(
1933
      self.cfg.ExpandInstanceName(self.op.instance_name))
1934
    if instance is None:
1935
      raise errors.OpPrereqError("Instance '%s' not known" %
1936
                                 self.op.instance_name)
1937
    self.instance = instance
1938

    
1939
  def Exec(self, feedback_fn):
1940
    """Deactivate the disks
1941

1942
    """
1943
    instance = self.instance
1944
    ins_l = rpc.call_instance_list([instance.primary_node])
1945
    ins_l = ins_l[instance.primary_node]
1946
    if not type(ins_l) is list:
1947
      raise errors.OpExecError("Can't contact node '%s'" %
1948
                               instance.primary_node)
1949

    
1950
    if self.instance.name in ins_l:
1951
      raise errors.OpExecError("Instance is running, can't shutdown"
1952
                               " block devices.")
1953

    
1954
    _ShutdownInstanceDisks(instance, self.cfg)
1955

    
1956

    
1957
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1958
  """Shutdown block devices of an instance.
1959

1960
  This does the shutdown on all nodes of the instance.
1961

1962
  If the ignore_primary is false, errors on the primary node are
1963
  ignored.
1964

1965
  """
1966
  result = True
1967
  for disk in instance.disks:
1968
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1969
      cfg.SetDiskID(top_disk, node)
1970
      if not rpc.call_blockdev_shutdown(node, top_disk):
1971
        logger.Error("could not shutdown block device %s on node %s" %
1972
                     (disk.iv_name, node))
1973
        if not ignore_primary or node != instance.primary_node:
1974
          result = False
1975
  return result
1976

    
1977

    
1978
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1979
  """Checks if a node has enough free memory.
1980

1981
  This function check if a given node has the needed amount of free
1982
  memory. In case the node has less memory or we cannot get the
1983
  information from the node, this function raise an OpPrereqError
1984
  exception.
1985

1986
  Args:
1987
    - cfg: a ConfigWriter instance
1988
    - node: the node name
1989
    - reason: string to use in the error message
1990
    - requested: the amount of memory in MiB
1991

1992
  """
1993
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1994
  if not nodeinfo or not isinstance(nodeinfo, dict):
1995
    raise errors.OpPrereqError("Could not contact node %s for resource"
1996
                             " information" % (node,))
1997

    
1998
  free_mem = nodeinfo[node].get('memory_free')
1999
  if not isinstance(free_mem, int):
2000
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2001
                             " was '%s'" % (node, free_mem))
2002
  if requested > free_mem:
2003
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2004
                             " needed %s MiB, available %s MiB" %
2005
                             (node, reason, requested, free_mem))
2006

    
2007

    
2008
class LUStartupInstance(LogicalUnit):
2009
  """Starts an instance.
2010

2011
  """
2012
  HPATH = "instance-start"
2013
  HTYPE = constants.HTYPE_INSTANCE
2014
  _OP_REQP = ["instance_name", "force"]
2015
  REQ_BGL = False
2016

    
2017
  def ExpandNames(self):
2018
    self._ExpandAndLockInstance()
2019
    self.needed_locks[locking.LEVEL_NODE] = []
2020
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2021

    
2022
  def DeclareLocks(self, level):
2023
    if level == locking.LEVEL_NODE:
2024
      self._LockInstancesNodes()
2025

    
2026
  def BuildHooksEnv(self):
2027
    """Build hooks env.
2028

2029
    This runs on master, primary and secondary nodes of the instance.
2030

2031
    """
2032
    env = {
2033
      "FORCE": self.op.force,
2034
      }
2035
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2036
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2037
          list(self.instance.secondary_nodes))
2038
    return env, nl, nl
2039

    
2040
  def CheckPrereq(self):
2041
    """Check prerequisites.
2042

2043
    This checks that the instance is in the cluster.
2044

2045
    """
2046
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2047
    assert self.instance is not None, \
2048
      "Cannot retrieve locked instance %s" % self.op.instance_name
2049

    
2050
    # check bridges existance
2051
    _CheckInstanceBridgesExist(instance)
2052

    
2053
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2054
                         "starting instance %s" % instance.name,
2055
                         instance.memory)
2056

    
2057
  def Exec(self, feedback_fn):
2058
    """Start the instance.
2059

2060
    """
2061
    instance = self.instance
2062
    force = self.op.force
2063
    extra_args = getattr(self.op, "extra_args", "")
2064

    
2065
    self.cfg.MarkInstanceUp(instance.name)
2066

    
2067
    node_current = instance.primary_node
2068

    
2069
    _StartInstanceDisks(self.cfg, instance, force)
2070

    
2071
    if not rpc.call_instance_start(node_current, instance, extra_args):
2072
      _ShutdownInstanceDisks(instance, self.cfg)
2073
      raise errors.OpExecError("Could not start instance")
2074

    
2075

    
2076
class LURebootInstance(LogicalUnit):
2077
  """Reboot an instance.
2078

2079
  """
2080
  HPATH = "instance-reboot"
2081
  HTYPE = constants.HTYPE_INSTANCE
2082
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2083
  REQ_BGL = False
2084

    
2085
  def ExpandNames(self):
2086
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2087
                                   constants.INSTANCE_REBOOT_HARD,
2088
                                   constants.INSTANCE_REBOOT_FULL]:
2089
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2090
                                  (constants.INSTANCE_REBOOT_SOFT,
2091
                                   constants.INSTANCE_REBOOT_HARD,
2092
                                   constants.INSTANCE_REBOOT_FULL))
2093
    self._ExpandAndLockInstance()
2094
    self.needed_locks[locking.LEVEL_NODE] = []
2095
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2096

    
2097
  def DeclareLocks(self, level):
2098
    if level == locking.LEVEL_NODE:
2099
      # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2100
      self._LockInstancesNodes()
2101

    
2102
  def BuildHooksEnv(self):
2103
    """Build hooks env.
2104

2105
    This runs on master, primary and secondary nodes of the instance.
2106

2107
    """
2108
    env = {
2109
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2110
      }
2111
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2112
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2113
          list(self.instance.secondary_nodes))
2114
    return env, nl, nl
2115

    
2116
  def CheckPrereq(self):
2117
    """Check prerequisites.
2118

2119
    This checks that the instance is in the cluster.
2120

2121
    """
2122
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2123
    assert self.instance is not None, \
2124
      "Cannot retrieve locked instance %s" % self.op.instance_name
2125

    
2126
    # check bridges existance
2127
    _CheckInstanceBridgesExist(instance)
2128

    
2129
  def Exec(self, feedback_fn):
2130
    """Reboot the instance.
2131

2132
    """
2133
    instance = self.instance
2134
    ignore_secondaries = self.op.ignore_secondaries
2135
    reboot_type = self.op.reboot_type
2136
    extra_args = getattr(self.op, "extra_args", "")
2137

    
2138
    node_current = instance.primary_node
2139

    
2140
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2141
                       constants.INSTANCE_REBOOT_HARD]:
2142
      if not rpc.call_instance_reboot(node_current, instance,
2143
                                      reboot_type, extra_args):
2144
        raise errors.OpExecError("Could not reboot instance")
2145
    else:
2146
      if not rpc.call_instance_shutdown(node_current, instance):
2147
        raise errors.OpExecError("could not shutdown instance for full reboot")
2148
      _ShutdownInstanceDisks(instance, self.cfg)
2149
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2150
      if not rpc.call_instance_start(node_current, instance, extra_args):
2151
        _ShutdownInstanceDisks(instance, self.cfg)
2152
        raise errors.OpExecError("Could not start instance for full reboot")
2153

    
2154
    self.cfg.MarkInstanceUp(instance.name)
2155

    
2156

    
2157
class LUShutdownInstance(LogicalUnit):
2158
  """Shutdown an instance.
2159

2160
  """
2161
  HPATH = "instance-stop"
2162
  HTYPE = constants.HTYPE_INSTANCE
2163
  _OP_REQP = ["instance_name"]
2164
  REQ_BGL = False
2165

    
2166
  def ExpandNames(self):
2167
    self._ExpandAndLockInstance()
2168
    self.needed_locks[locking.LEVEL_NODE] = []
2169
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2170

    
2171
  def DeclareLocks(self, level):
2172
    if level == locking.LEVEL_NODE:
2173
      self._LockInstancesNodes()
2174

    
2175
  def BuildHooksEnv(self):
2176
    """Build hooks env.
2177

2178
    This runs on master, primary and secondary nodes of the instance.
2179

2180
    """
2181
    env = _BuildInstanceHookEnvByObject(self.instance)
2182
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2183
          list(self.instance.secondary_nodes))
2184
    return env, nl, nl
2185

    
2186
  def CheckPrereq(self):
2187
    """Check prerequisites.
2188

2189
    This checks that the instance is in the cluster.
2190

2191
    """
2192
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2193
    assert self.instance is not None, \
2194
      "Cannot retrieve locked instance %s" % self.op.instance_name
2195

    
2196
  def Exec(self, feedback_fn):
2197
    """Shutdown the instance.
2198

2199
    """
2200
    instance = self.instance
2201
    node_current = instance.primary_node
2202
    self.cfg.MarkInstanceDown(instance.name)
2203
    if not rpc.call_instance_shutdown(node_current, instance):
2204
      logger.Error("could not shutdown instance")
2205

    
2206
    _ShutdownInstanceDisks(instance, self.cfg)
2207

    
2208

    
2209
class LUReinstallInstance(LogicalUnit):
2210
  """Reinstall an instance.
2211

2212
  """
2213
  HPATH = "instance-reinstall"
2214
  HTYPE = constants.HTYPE_INSTANCE
2215
  _OP_REQP = ["instance_name"]
2216
  REQ_BGL = False
2217

    
2218
  def ExpandNames(self):
2219
    self._ExpandAndLockInstance()
2220
    self.needed_locks[locking.LEVEL_NODE] = []
2221
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2222

    
2223
  def DeclareLocks(self, level):
2224
    if level == locking.LEVEL_NODE:
2225
      self._LockInstancesNodes()
2226

    
2227
  def BuildHooksEnv(self):
2228
    """Build hooks env.
2229

2230
    This runs on master, primary and secondary nodes of the instance.
2231

2232
    """
2233
    env = _BuildInstanceHookEnvByObject(self.instance)
2234
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2235
          list(self.instance.secondary_nodes))
2236
    return env, nl, nl
2237

    
2238
  def CheckPrereq(self):
2239
    """Check prerequisites.
2240

2241
    This checks that the instance is in the cluster and is not running.
2242

2243
    """
2244
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2245
    assert instance is not None, \
2246
      "Cannot retrieve locked instance %s" % self.op.instance_name
2247

    
2248
    if instance.disk_template == constants.DT_DISKLESS:
2249
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2250
                                 self.op.instance_name)
2251
    if instance.status != "down":
2252
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2253
                                 self.op.instance_name)
2254
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2255
    if remote_info:
2256
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2257
                                 (self.op.instance_name,
2258
                                  instance.primary_node))
2259

    
2260
    self.op.os_type = getattr(self.op, "os_type", None)
2261
    if self.op.os_type is not None:
2262
      # OS verification
2263
      pnode = self.cfg.GetNodeInfo(
2264
        self.cfg.ExpandNodeName(instance.primary_node))
2265
      if pnode is None:
2266
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2267
                                   self.op.pnode)
2268
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2269
      if not os_obj:
2270
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2271
                                   " primary node"  % self.op.os_type)
2272

    
2273
    self.instance = instance
2274

    
2275
  def Exec(self, feedback_fn):
2276
    """Reinstall the instance.
2277

2278
    """
2279
    inst = self.instance
2280

    
2281
    if self.op.os_type is not None:
2282
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2283
      inst.os = self.op.os_type
2284
      self.cfg.AddInstance(inst)
2285

    
2286
    _StartInstanceDisks(self.cfg, inst, None)
2287
    try:
2288
      feedback_fn("Running the instance OS create scripts...")
2289
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2290
        raise errors.OpExecError("Could not install OS for instance %s"
2291
                                 " on node %s" %
2292
                                 (inst.name, inst.primary_node))
2293
    finally:
2294
      _ShutdownInstanceDisks(inst, self.cfg)
2295

    
2296

    
2297
class LURenameInstance(LogicalUnit):
2298
  """Rename an instance.
2299

2300
  """
2301
  HPATH = "instance-rename"
2302
  HTYPE = constants.HTYPE_INSTANCE
2303
  _OP_REQP = ["instance_name", "new_name"]
2304

    
2305
  def BuildHooksEnv(self):
2306
    """Build hooks env.
2307

2308
    This runs on master, primary and secondary nodes of the instance.
2309

2310
    """
2311
    env = _BuildInstanceHookEnvByObject(self.instance)
2312
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2313
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2314
          list(self.instance.secondary_nodes))
2315
    return env, nl, nl
2316

    
2317
  def CheckPrereq(self):
2318
    """Check prerequisites.
2319

2320
    This checks that the instance is in the cluster and is not running.
2321

2322
    """
2323
    instance = self.cfg.GetInstanceInfo(
2324
      self.cfg.ExpandInstanceName(self.op.instance_name))
2325
    if instance is None:
2326
      raise errors.OpPrereqError("Instance '%s' not known" %
2327
                                 self.op.instance_name)
2328
    if instance.status != "down":
2329
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2330
                                 self.op.instance_name)
2331
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2332
    if remote_info:
2333
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2334
                                 (self.op.instance_name,
2335
                                  instance.primary_node))
2336
    self.instance = instance
2337

    
2338
    # new name verification
2339
    name_info = utils.HostInfo(self.op.new_name)
2340

    
2341
    self.op.new_name = new_name = name_info.name
2342
    instance_list = self.cfg.GetInstanceList()
2343
    if new_name in instance_list:
2344
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2345
                                 new_name)
2346

    
2347
    if not getattr(self.op, "ignore_ip", False):
2348
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2349
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2350
                                   (name_info.ip, new_name))
2351

    
2352

    
2353
  def Exec(self, feedback_fn):
2354
    """Reinstall the instance.
2355

2356
    """
2357
    inst = self.instance
2358
    old_name = inst.name
2359

    
2360
    if inst.disk_template == constants.DT_FILE:
2361
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2362

    
2363
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2364
    # Change the instance lock. This is definitely safe while we hold the BGL
2365
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2366
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2367

    
2368
    # re-read the instance from the configuration after rename
2369
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2370

    
2371
    if inst.disk_template == constants.DT_FILE:
2372
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2373
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2374
                                                old_file_storage_dir,
2375
                                                new_file_storage_dir)
2376

    
2377
      if not result:
2378
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2379
                                 " directory '%s' to '%s' (but the instance"
2380
                                 " has been renamed in Ganeti)" % (
2381
                                 inst.primary_node, old_file_storage_dir,
2382
                                 new_file_storage_dir))
2383

    
2384
      if not result[0]:
2385
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2386
                                 " (but the instance has been renamed in"
2387
                                 " Ganeti)" % (old_file_storage_dir,
2388
                                               new_file_storage_dir))
2389

    
2390
    _StartInstanceDisks(self.cfg, inst, None)
2391
    try:
2392
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2393
                                          "sda", "sdb"):
2394
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2395
               " instance has been renamed in Ganeti)" %
2396
               (inst.name, inst.primary_node))
2397
        logger.Error(msg)
2398
    finally:
2399
      _ShutdownInstanceDisks(inst, self.cfg)
2400

    
2401

    
2402
class LURemoveInstance(LogicalUnit):
2403
  """Remove an instance.
2404

2405
  """
2406
  HPATH = "instance-remove"
2407
  HTYPE = constants.HTYPE_INSTANCE
2408
  _OP_REQP = ["instance_name", "ignore_failures"]
2409

    
2410
  def BuildHooksEnv(self):
2411
    """Build hooks env.
2412

2413
    This runs on master, primary and secondary nodes of the instance.
2414

2415
    """
2416
    env = _BuildInstanceHookEnvByObject(self.instance)
2417
    nl = [self.sstore.GetMasterNode()]
2418
    return env, nl, nl
2419

    
2420
  def CheckPrereq(self):
2421
    """Check prerequisites.
2422

2423
    This checks that the instance is in the cluster.
2424

2425
    """
2426
    instance = self.cfg.GetInstanceInfo(
2427
      self.cfg.ExpandInstanceName(self.op.instance_name))
2428
    if instance is None:
2429
      raise errors.OpPrereqError("Instance '%s' not known" %
2430
                                 self.op.instance_name)
2431
    self.instance = instance
2432

    
2433
  def Exec(self, feedback_fn):
2434
    """Remove the instance.
2435

2436
    """
2437
    instance = self.instance
2438
    logger.Info("shutting down instance %s on node %s" %
2439
                (instance.name, instance.primary_node))
2440

    
2441
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2442
      if self.op.ignore_failures:
2443
        feedback_fn("Warning: can't shutdown instance")
2444
      else:
2445
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2446
                                 (instance.name, instance.primary_node))
2447

    
2448
    logger.Info("removing block devices for instance %s" % instance.name)
2449

    
2450
    if not _RemoveDisks(instance, self.cfg):
2451
      if self.op.ignore_failures:
2452
        feedback_fn("Warning: can't remove instance's disks")
2453
      else:
2454
        raise errors.OpExecError("Can't remove instance's disks")
2455

    
2456
    logger.Info("removing instance %s out of cluster config" % instance.name)
2457

    
2458
    self.cfg.RemoveInstance(instance.name)
2459
    # Remove the new instance from the Ganeti Lock Manager
2460
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2461

    
2462

    
2463
class LUQueryInstances(NoHooksLU):
2464
  """Logical unit for querying instances.
2465

2466
  """
2467
  _OP_REQP = ["output_fields", "names"]
2468
  REQ_BGL = False
2469

    
2470
  def ExpandNames(self):
2471
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2472
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2473
                               "admin_state", "admin_ram",
2474
                               "disk_template", "ip", "mac", "bridge",
2475
                               "sda_size", "sdb_size", "vcpus", "tags"],
2476
                       dynamic=self.dynamic_fields,
2477
                       selected=self.op.output_fields)
2478

    
2479
    self.needed_locks = {}
2480
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2481
    self.share_locks[locking.LEVEL_NODE] = 1
2482

    
2483
    # TODO: we could lock instances (and nodes) only if the user asked for
2484
    # dynamic fields. For that we need atomic ways to get info for a group of
2485
    # instances from the config, though.
2486
    if not self.op.names:
2487
      self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
2488
    else:
2489
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2490
        _GetWantedInstances(self, self.op.names)
2491

    
2492
    self.needed_locks[locking.LEVEL_NODE] = []
2493
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2494

    
2495
  def DeclareLocks(self, level):
2496
    # TODO: locking of nodes could be avoided when not querying them
2497
    if level == locking.LEVEL_NODE:
2498
      self._LockInstancesNodes()
2499

    
2500
  def CheckPrereq(self):
2501
    """Check prerequisites.
2502

2503
    """
2504
    # This of course is valid only if we locked the instances
2505
    self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2506

    
2507
  def Exec(self, feedback_fn):
2508
    """Computes the list of nodes and their attributes.
2509

2510
    """
2511
    instance_names = self.wanted
2512
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2513
                     in instance_names]
2514

    
2515
    # begin data gathering
2516

    
2517
    nodes = frozenset([inst.primary_node for inst in instance_list])
2518

    
2519
    bad_nodes = []
2520
    if self.dynamic_fields.intersection(self.op.output_fields):
2521
      live_data = {}
2522
      node_data = rpc.call_all_instances_info(nodes)
2523
      for name in nodes:
2524
        result = node_data[name]
2525
        if result:
2526
          live_data.update(result)
2527
        elif result == False:
2528
          bad_nodes.append(name)
2529
        # else no instance is alive
2530
    else:
2531
      live_data = dict([(name, {}) for name in instance_names])
2532

    
2533
    # end data gathering
2534

    
2535
    output = []
2536
    for instance in instance_list:
2537
      iout = []
2538
      for field in self.op.output_fields:
2539
        if field == "name":
2540
          val = instance.name
2541
        elif field == "os":
2542
          val = instance.os
2543
        elif field == "pnode":
2544
          val = instance.primary_node
2545
        elif field == "snodes":
2546
          val = list(instance.secondary_nodes)
2547
        elif field == "admin_state":
2548
          val = (instance.status != "down")
2549
        elif field == "oper_state":
2550
          if instance.primary_node in bad_nodes:
2551
            val = None
2552
          else:
2553
            val = bool(live_data.get(instance.name))
2554
        elif field == "status":
2555
          if instance.primary_node in bad_nodes:
2556
            val = "ERROR_nodedown"
2557
          else:
2558
            running = bool(live_data.get(instance.name))
2559
            if running:
2560
              if instance.status != "down":
2561
                val = "running"
2562
              else:
2563
                val = "ERROR_up"
2564
            else:
2565
              if instance.status != "down":
2566
                val = "ERROR_down"
2567
              else:
2568
                val = "ADMIN_down"
2569
        elif field == "admin_ram":
2570
          val = instance.memory
2571
        elif field == "oper_ram":
2572
          if instance.primary_node in bad_nodes:
2573
            val = None
2574
          elif instance.name in live_data:
2575
            val = live_data[instance.name].get("memory", "?")
2576
          else:
2577
            val = "-"
2578
        elif field == "disk_template":
2579
          val = instance.disk_template
2580
        elif field == "ip":
2581
          val = instance.nics[0].ip
2582
        elif field == "bridge":
2583
          val = instance.nics[0].bridge
2584
        elif field == "mac":
2585
          val = instance.nics[0].mac
2586
        elif field == "sda_size" or field == "sdb_size":
2587
          disk = instance.FindDisk(field[:3])
2588
          if disk is None:
2589
            val = None
2590
          else:
2591
            val = disk.size
2592
        elif field == "vcpus":
2593
          val = instance.vcpus
2594
        elif field == "tags":
2595
          val = list(instance.GetTags())
2596
        else:
2597
          raise errors.ParameterError(field)
2598
        iout.append(val)
2599
      output.append(iout)
2600

    
2601
    return output
2602

    
2603

    
2604
class LUFailoverInstance(LogicalUnit):
2605
  """Failover an instance.
2606

2607
  """
2608
  HPATH = "instance-failover"
2609
  HTYPE = constants.HTYPE_INSTANCE
2610
  _OP_REQP = ["instance_name", "ignore_consistency"]
2611
  REQ_BGL = False
2612

    
2613
  def ExpandNames(self):
2614
    self._ExpandAndLockInstance()
2615
    self.needed_locks[locking.LEVEL_NODE] = []
2616
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2617

    
2618
  def DeclareLocks(self, level):
2619
    if level == locking.LEVEL_NODE:
2620
      self._LockInstancesNodes()
2621

    
2622
  def BuildHooksEnv(self):
2623
    """Build hooks env.
2624

2625
    This runs on master, primary and secondary nodes of the instance.
2626

2627
    """
2628
    env = {
2629
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2630
      }
2631
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2632
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2633
    return env, nl, nl
2634

    
2635
  def CheckPrereq(self):
2636
    """Check prerequisites.
2637

2638
    This checks that the instance is in the cluster.
2639

2640
    """
2641
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2642
    assert self.instance is not None, \
2643
      "Cannot retrieve locked instance %s" % self.op.instance_name
2644

    
2645
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2646
      raise errors.OpPrereqError("Instance's disk layout is not"
2647
                                 " network mirrored, cannot failover.")
2648

    
2649
    secondary_nodes = instance.secondary_nodes
2650
    if not secondary_nodes:
2651
      raise errors.ProgrammerError("no secondary node but using "
2652
                                   "a mirrored disk template")
2653

    
2654
    target_node = secondary_nodes[0]
2655
    # check memory requirements on the secondary node
2656
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2657
                         instance.name, instance.memory)
2658

    
2659
    # check bridge existance
2660
    brlist = [nic.bridge for nic in instance.nics]
2661
    if not rpc.call_bridges_exist(target_node, brlist):
2662
      raise errors.OpPrereqError("One or more target bridges %s does not"
2663
                                 " exist on destination node '%s'" %
2664
                                 (brlist, target_node))
2665

    
2666
  def Exec(self, feedback_fn):
2667
    """Failover an instance.
2668

2669
    The failover is done by shutting it down on its present node and
2670
    starting it on the secondary.
2671

2672
    """
2673
    instance = self.instance
2674

    
2675
    source_node = instance.primary_node
2676
    target_node = instance.secondary_nodes[0]
2677

    
2678
    feedback_fn("* checking disk consistency between source and target")
2679
    for dev in instance.disks:
2680
      # for drbd, these are drbd over lvm
2681
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2682
        if instance.status == "up" and not self.op.ignore_consistency:
2683
          raise errors.OpExecError("Disk %s is degraded on target node,"
2684
                                   " aborting failover." % dev.iv_name)
2685

    
2686
    feedback_fn("* shutting down instance on source node")
2687
    logger.Info("Shutting down instance %s on node %s" %
2688
                (instance.name, source_node))
2689

    
2690
    if not rpc.call_instance_shutdown(source_node, instance):
2691
      if self.op.ignore_consistency:
2692
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2693
                     " anyway. Please make sure node %s is down"  %
2694
                     (instance.name, source_node, source_node))
2695
      else:
2696
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2697
                                 (instance.name, source_node))
2698

    
2699
    feedback_fn("* deactivating the instance's disks on source node")
2700
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2701
      raise errors.OpExecError("Can't shut down the instance's disks.")
2702

    
2703
    instance.primary_node = target_node
2704
    # distribute new instance config to the other nodes
2705
    self.cfg.Update(instance)
2706

    
2707
    # Only start the instance if it's marked as up
2708
    if instance.status == "up":
2709
      feedback_fn("* activating the instance's disks on target node")
2710
      logger.Info("Starting instance %s on node %s" %
2711
                  (instance.name, target_node))
2712

    
2713
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2714
                                               ignore_secondaries=True)
2715
      if not disks_ok:
2716
        _ShutdownInstanceDisks(instance, self.cfg)
2717
        raise errors.OpExecError("Can't activate the instance's disks")
2718

    
2719
      feedback_fn("* starting the instance on the target node")
2720
      if not rpc.call_instance_start(target_node, instance, None):
2721
        _ShutdownInstanceDisks(instance, self.cfg)
2722
        raise errors.OpExecError("Could not start instance %s on node %s." %
2723
                                 (instance.name, target_node))
2724

    
2725

    
2726
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2727
  """Create a tree of block devices on the primary node.
2728

2729
  This always creates all devices.
2730

2731
  """
2732
  if device.children:
2733
    for child in device.children:
2734
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2735
        return False
2736

    
2737
  cfg.SetDiskID(device, node)
2738
  new_id = rpc.call_blockdev_create(node, device, device.size,
2739
                                    instance.name, True, info)
2740
  if not new_id:
2741
    return False
2742
  if device.physical_id is None:
2743
    device.physical_id = new_id
2744
  return True
2745

    
2746

    
2747
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2748
  """Create a tree of block devices on a secondary node.
2749

2750
  If this device type has to be created on secondaries, create it and
2751
  all its children.
2752

2753
  If not, just recurse to children keeping the same 'force' value.
2754

2755
  """
2756
  if device.CreateOnSecondary():
2757
    force = True
2758
  if device.children:
2759
    for child in device.children:
2760
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2761
                                        child, force, info):
2762
        return False
2763

    
2764
  if not force:
2765
    return True
2766
  cfg.SetDiskID(device, node)
2767
  new_id = rpc.call_blockdev_create(node, device, device.size,
2768
                                    instance.name, False, info)
2769
  if not new_id:
2770
    return False
2771
  if device.physical_id is None:
2772
    device.physical_id = new_id
2773
  return True
2774

    
2775

    
2776
def _GenerateUniqueNames(cfg, exts):
2777
  """Generate a suitable LV name.
2778

2779
  This will generate a logical volume name for the given instance.
2780

2781
  """
2782
  results = []
2783
  for val in exts:
2784
    new_id = cfg.GenerateUniqueID()
2785
    results.append("%s%s" % (new_id, val))
2786
  return results
2787

    
2788

    
2789
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2790
  """Generate a drbd8 device complete with its children.
2791

2792
  """
2793
  port = cfg.AllocatePort()
2794
  vgname = cfg.GetVGName()
2795
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2796
                          logical_id=(vgname, names[0]))
2797
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2798
                          logical_id=(vgname, names[1]))
2799
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2800
                          logical_id = (primary, secondary, port),
2801
                          children = [dev_data, dev_meta],
2802
                          iv_name=iv_name)
2803
  return drbd_dev
2804

    
2805

    
2806
def _GenerateDiskTemplate(cfg, template_name,
2807
                          instance_name, primary_node,
2808
                          secondary_nodes, disk_sz, swap_sz,
2809
                          file_storage_dir, file_driver):
2810
  """Generate the entire disk layout for a given template type.
2811

2812
  """
2813
  #TODO: compute space requirements
2814

    
2815
  vgname = cfg.GetVGName()
2816
  if template_name == constants.DT_DISKLESS:
2817
    disks = []
2818
  elif template_name == constants.DT_PLAIN:
2819
    if len(secondary_nodes) != 0:
2820
      raise errors.ProgrammerError("Wrong template configuration")
2821

    
2822
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2823
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2824
                           logical_id=(vgname, names[0]),
2825
                           iv_name = "sda")
2826
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2827
                           logical_id=(vgname, names[1]),
2828
                           iv_name = "sdb")
2829
    disks = [sda_dev, sdb_dev]
2830
  elif template_name == constants.DT_DRBD8:
2831
    if len(secondary_nodes) != 1:
2832
      raise errors.ProgrammerError("Wrong template configuration")
2833
    remote_node = secondary_nodes[0]
2834
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2835
                                       ".sdb_data", ".sdb_meta"])
2836
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2837
                                         disk_sz, names[0:2], "sda")
2838
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2839
                                         swap_sz, names[2:4], "sdb")
2840
    disks = [drbd_sda_dev, drbd_sdb_dev]
2841
  elif template_name == constants.DT_FILE:
2842
    if len(secondary_nodes) != 0:
2843
      raise errors.ProgrammerError("Wrong template configuration")
2844

    
2845
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2846
                                iv_name="sda", logical_id=(file_driver,
2847
                                "%s/sda" % file_storage_dir))
2848
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2849
                                iv_name="sdb", logical_id=(file_driver,
2850
                                "%s/sdb" % file_storage_dir))
2851
    disks = [file_sda_dev, file_sdb_dev]
2852
  else:
2853
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2854
  return disks
2855

    
2856

    
2857
def _GetInstanceInfoText(instance):
2858
  """Compute that text that should be added to the disk's metadata.
2859

2860
  """
2861
  return "originstname+%s" % instance.name
2862

    
2863

    
2864
def _CreateDisks(cfg, instance):
2865
  """Create all disks for an instance.
2866

2867
  This abstracts away some work from AddInstance.
2868

2869
  Args:
2870
    instance: the instance object
2871

2872
  Returns:
2873
    True or False showing the success of the creation process
2874

2875
  """
2876
  info = _GetInstanceInfoText(instance)
2877

    
2878
  if instance.disk_template == constants.DT_FILE:
2879
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2880
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2881
                                              file_storage_dir)
2882

    
2883
    if not result:
2884
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2885
      return False
2886

    
2887
    if not result[0]:
2888
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2889
      return False
2890

    
2891
  for device in instance.disks:
2892
    logger.Info("creating volume %s for instance %s" %
2893
                (device.iv_name, instance.name))
2894
    #HARDCODE
2895
    for secondary_node in instance.secondary_nodes:
2896
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2897
                                        device, False, info):
2898
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2899
                     (device.iv_name, device, secondary_node))
2900
        return False
2901
    #HARDCODE
2902
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2903
                                    instance, device, info):
2904
      logger.Error("failed to create volume %s on primary!" %
2905
                   device.iv_name)
2906
      return False
2907

    
2908
  return True
2909

    
2910

    
2911
def _RemoveDisks(instance, cfg):
2912
  """Remove all disks for an instance.
2913

2914
  This abstracts away some work from `AddInstance()` and
2915
  `RemoveInstance()`. Note that in case some of the devices couldn't
2916
  be removed, the removal will continue with the other ones (compare
2917
  with `_CreateDisks()`).
2918

2919
  Args:
2920
    instance: the instance object
2921

2922
  Returns:
2923
    True or False showing the success of the removal proces
2924

2925
  """
2926
  logger.Info("removing block devices for instance %s" % instance.name)
2927

    
2928
  result = True
2929
  for device in instance.disks:
2930
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2931
      cfg.SetDiskID(disk, node)
2932
      if not rpc.call_blockdev_remove(node, disk):
2933
        logger.Error("could not remove block device %s on node %s,"
2934
                     " continuing anyway" %
2935
                     (device.iv_name, node))
2936
        result = False
2937

    
2938
  if instance.disk_template == constants.DT_FILE:
2939
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2940
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2941
                                            file_storage_dir):
2942
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2943
      result = False
2944

    
2945
  return result
2946

    
2947

    
2948
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2949
  """Compute disk size requirements in the volume group
2950

2951
  This is currently hard-coded for the two-drive layout.
2952

2953
  """
2954
  # Required free disk space as a function of disk and swap space
2955
  req_size_dict = {
2956
    constants.DT_DISKLESS: None,
2957
    constants.DT_PLAIN: disk_size + swap_size,
2958
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2959
    constants.DT_DRBD8: disk_size + swap_size + 256,
2960
    constants.DT_FILE: None,
2961
  }
2962

    
2963
  if disk_template not in req_size_dict:
2964
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2965
                                 " is unknown" %  disk_template)
2966

    
2967
  return req_size_dict[disk_template]
2968

    
2969

    
2970
class LUCreateInstance(LogicalUnit):
2971
  """Create an instance.
2972

2973
  """
2974
  HPATH = "instance-add"
2975
  HTYPE = constants.HTYPE_INSTANCE
2976
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2977
              "disk_template", "swap_size", "mode", "start", "vcpus",
2978
              "wait_for_sync", "ip_check", "mac"]
2979

    
2980
  def _RunAllocator(self):
2981
    """Run the allocator based on input opcode.
2982

2983
    """
2984
    disks = [{"size": self.op.disk_size, "mode": "w"},
2985
             {"size": self.op.swap_size, "mode": "w"}]
2986
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2987
             "bridge": self.op.bridge}]
2988
    ial = IAllocator(self.cfg, self.sstore,
2989
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2990
                     name=self.op.instance_name,
2991
                     disk_template=self.op.disk_template,
2992
                     tags=[],
2993
                     os=self.op.os_type,
2994
                     vcpus=self.op.vcpus,
2995
                     mem_size=self.op.mem_size,
2996
                     disks=disks,
2997
                     nics=nics,
2998
                     )
2999

    
3000
    ial.Run(self.op.iallocator)
3001

    
3002
    if not ial.success:
3003
      raise errors.OpPrereqError("Can't compute nodes using"
3004
                                 " iallocator '%s': %s" % (self.op.iallocator,
3005
                                                           ial.info))
3006
    if len(ial.nodes) != ial.required_nodes:
3007
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3008
                                 " of nodes (%s), required %s" %
3009
                                 (len(ial.nodes), ial.required_nodes))
3010
    self.op.pnode = ial.nodes[0]
3011
    logger.ToStdout("Selected nodes for the instance: %s" %
3012
                    (", ".join(ial.nodes),))
3013
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3014
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3015
    if ial.required_nodes == 2:
3016
      self.op.snode = ial.nodes[1]
3017

    
3018
  def BuildHooksEnv(self):
3019
    """Build hooks env.
3020

3021
    This runs on master, primary and secondary nodes of the instance.
3022

3023
    """
3024
    env = {
3025
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3026
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3027
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3028
      "INSTANCE_ADD_MODE": self.op.mode,
3029
      }
3030
    if self.op.mode == constants.INSTANCE_IMPORT:
3031
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3032
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3033
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3034

    
3035
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3036
      primary_node=self.op.pnode,
3037
      secondary_nodes=self.secondaries,
3038
      status=self.instance_status,
3039
      os_type=self.op.os_type,
3040
      memory=self.op.mem_size,
3041
      vcpus=self.op.vcpus,
3042
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3043
    ))
3044

    
3045
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3046
          self.secondaries)
3047
    return env, nl, nl
3048

    
3049

    
3050
  def CheckPrereq(self):
3051
    """Check prerequisites.
3052

3053
    """
3054
    # set optional parameters to none if they don't exist
3055
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3056
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3057
                 "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3058
      if not hasattr(self.op, attr):
3059
        setattr(self.op, attr, None)
3060

    
3061
    if self.op.mode not in (constants.INSTANCE_CREATE,
3062
                            constants.INSTANCE_IMPORT):
3063
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3064
                                 self.op.mode)
3065

    
3066
    if (not self.cfg.GetVGName() and
3067
        self.op.disk_template not in constants.DTS_NOT_LVM):
3068
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3069
                                 " instances")
3070

    
3071
    if self.op.mode == constants.INSTANCE_IMPORT:
3072
      src_node = getattr(self.op, "src_node", None)
3073
      src_path = getattr(self.op, "src_path", None)
3074
      if src_node is None or src_path is None:
3075
        raise errors.OpPrereqError("Importing an instance requires source"
3076
                                   " node and path options")
3077
      src_node_full = self.cfg.ExpandNodeName(src_node)
3078
      if src_node_full is None:
3079
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3080
      self.op.src_node = src_node = src_node_full
3081

    
3082
      if not os.path.isabs(src_path):
3083
        raise errors.OpPrereqError("The source path must be absolute")
3084

    
3085
      export_info = rpc.call_export_info(src_node, src_path)
3086

    
3087
      if not export_info:
3088
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3089

    
3090
      if not export_info.has_section(constants.INISECT_EXP):
3091
        raise errors.ProgrammerError("Corrupted export config")
3092

    
3093
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3094
      if (int(ei_version) != constants.EXPORT_VERSION):
3095
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3096
                                   (ei_version, constants.EXPORT_VERSION))
3097

    
3098
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3099
        raise errors.OpPrereqError("Can't import instance with more than"
3100
                                   " one data disk")
3101

    
3102
      # FIXME: are the old os-es, disk sizes, etc. useful?
3103
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3104
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3105
                                                         'disk0_dump'))
3106
      self.src_image = diskimage
3107
    else: # INSTANCE_CREATE
3108
      if getattr(self.op, "os_type", None) is None:
3109
        raise errors.OpPrereqError("No guest OS specified")
3110

    
3111
    #### instance parameters check
3112

    
3113
    # disk template and mirror node verification
3114
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3115
      raise errors.OpPrereqError("Invalid disk template name")
3116

    
3117
    # instance name verification
3118
    hostname1 = utils.HostInfo(self.op.instance_name)
3119

    
3120
    self.op.instance_name = instance_name = hostname1.name
3121
    instance_list = self.cfg.GetInstanceList()
3122
    if instance_name in instance_list:
3123
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3124
                                 instance_name)
3125

    
3126
    # ip validity checks
3127
    ip = getattr(self.op, "ip", None)
3128
    if ip is None or ip.lower() == "none":
3129
      inst_ip = None
3130
    elif ip.lower() == "auto":
3131
      inst_ip = hostname1.ip
3132
    else:
3133
      if not utils.IsValidIP(ip):
3134
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3135
                                   " like a valid IP" % ip)
3136
      inst_ip = ip
3137
    self.inst_ip = self.op.ip = inst_ip
3138

    
3139
    if self.op.start and not self.op.ip_check:
3140
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3141
                                 " adding an instance in start mode")
3142

    
3143
    if self.op.ip_check:
3144
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3145
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3146
                                   (hostname1.ip, instance_name))
3147

    
3148
    # MAC address verification
3149
    if self.op.mac != "auto":
3150
      if not utils.IsValidMac(self.op.mac.lower()):
3151
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3152
                                   self.op.mac)
3153

    
3154
    # bridge verification
3155
    bridge = getattr(self.op, "bridge", None)
3156
    if bridge is None:
3157
      self.op.bridge = self.cfg.GetDefBridge()
3158
    else:
3159
      self.op.bridge = bridge
3160

    
3161
    # boot order verification
3162
    if self.op.hvm_boot_order is not None:
3163
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3164
        raise errors.OpPrereqError("invalid boot order specified,"
3165
                                   " must be one or more of [acdn]")
3166
    # file storage checks
3167
    if (self.op.file_driver and
3168
        not self.op.file_driver in constants.FILE_DRIVER):
3169
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3170
                                 self.op.file_driver)
3171

    
3172
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3173
      raise errors.OpPrereqError("File storage directory not a relative"
3174
                                 " path")
3175
    #### allocator run
3176

    
3177
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3178
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3179
                                 " node must be given")
3180

    
3181
    if self.op.iallocator is not None:
3182
      self._RunAllocator()
3183

    
3184
    #### node related checks
3185

    
3186
    # check primary node
3187
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3188
    if pnode is None:
3189
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3190
                                 self.op.pnode)
3191
    self.op.pnode = pnode.name
3192
    self.pnode = pnode
3193
    self.secondaries = []
3194

    
3195
    # mirror node verification
3196
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3197
      if getattr(self.op, "snode", None) is None:
3198
        raise errors.OpPrereqError("The networked disk templates need"
3199
                                   " a mirror node")
3200

    
3201
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3202
      if snode_name is None:
3203
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3204
                                   self.op.snode)
3205
      elif snode_name == pnode.name:
3206
        raise errors.OpPrereqError("The secondary node cannot be"
3207
                                   " the primary node.")
3208
      self.secondaries.append(snode_name)
3209

    
3210
    req_size = _ComputeDiskSize(self.op.disk_template,
3211
                                self.op.disk_size, self.op.swap_size)
3212

    
3213
    # Check lv size requirements
3214
    if req_size is not None:
3215
      nodenames = [pnode.name] + self.secondaries
3216
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3217
      for node in nodenames:
3218
        info = nodeinfo.get(node, None)
3219
        if not info:
3220
          raise errors.OpPrereqError("Cannot get current information"
3221
                                     " from node '%s'" % node)
3222
        vg_free = info.get('vg_free', None)
3223
        if not isinstance(vg_free, int):
3224
          raise errors.OpPrereqError("Can't compute free disk space on"
3225
                                     " node %s" % node)
3226
        if req_size > info['vg_free']:
3227
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3228
                                     " %d MB available, %d MB required" %
3229
                                     (node, info['vg_free'], req_size))
3230

    
3231
    # os verification
3232
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3233
    if not os_obj:
3234
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3235
                                 " primary node"  % self.op.os_type)
3236

    
3237
    if self.op.kernel_path == constants.VALUE_NONE:
3238
      raise errors.OpPrereqError("Can't set instance kernel to none")
3239

    
3240

    
3241
    # bridge check on primary node
3242
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3243
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3244
                                 " destination node '%s'" %
3245
                                 (self.op.bridge, pnode.name))
3246

    
3247
    # memory check on primary node
3248
    if self.op.start:
3249
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3250
                           "creating instance %s" % self.op.instance_name,
3251
                           self.op.mem_size)
3252

    
3253
    # hvm_cdrom_image_path verification
3254
    if self.op.hvm_cdrom_image_path is not None:
3255
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3256
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3257
                                   " be an absolute path or None, not %s" %
3258
                                   self.op.hvm_cdrom_image_path)
3259
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3260
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3261
                                   " regular file or a symlink pointing to"
3262
                                   " an existing regular file, not %s" %
3263
                                   self.op.hvm_cdrom_image_path)
3264

    
3265
    # vnc_bind_address verification
3266
    if self.op.vnc_bind_address is not None:
3267
      if not utils.IsValidIP(self.op.vnc_bind_address):
3268
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3269
                                   " like a valid IP address" %
3270
                                   self.op.vnc_bind_address)
3271

    
3272
    # Xen HVM device type checks
3273
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3274
      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3275
        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3276
                                   " hypervisor" % self.op.hvm_nic_type)
3277
      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3278
        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3279
                                   " hypervisor" % self.op.hvm_disk_type)
3280

    
3281
    if self.op.start:
3282
      self.instance_status = 'up'
3283
    else:
3284
      self.instance_status = 'down'
3285

    
3286
  def Exec(self, feedback_fn):
3287
    """Create and add the instance to the cluster.
3288

3289
    """
3290
    instance = self.op.instance_name
3291
    pnode_name = self.pnode.name
3292

    
3293
    if self.op.mac == "auto":
3294
      mac_address = self.cfg.GenerateMAC()
3295
    else:
3296
      mac_address = self.op.mac
3297

    
3298
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3299
    if self.inst_ip is not None:
3300
      nic.ip = self.inst_ip
3301

    
3302
    ht_kind = self.sstore.GetHypervisorType()
3303
    if ht_kind in constants.HTS_REQ_PORT:
3304
      network_port = self.cfg.AllocatePort()
3305
    else:
3306
      network_port = None
3307

    
3308
    if self.op.vnc_bind_address is None:
3309
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3310

    
3311
    # this is needed because os.path.join does not accept None arguments
3312
    if self.op.file_storage_dir is None:
3313
      string_file_storage_dir = ""
3314
    else:
3315
      string_file_storage_dir = self.op.file_storage_dir
3316

    
3317
    # build the full file storage dir path
3318
    file_storage_dir = os.path.normpath(os.path.join(
3319
                                        self.sstore.GetFileStorageDir(),
3320
                                        string_file_storage_dir, instance))
3321

    
3322

    
3323
    disks = _GenerateDiskTemplate(self.cfg,
3324
                                  self.op.disk_template,
3325
                                  instance, pnode_name,
3326
                                  self.secondaries, self.op.disk_size,
3327
                                  self.op.swap_size,
3328
                                  file_storage_dir,
3329
                                  self.op.file_driver)
3330

    
3331
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3332
                            primary_node=pnode_name,
3333
                            memory=self.op.mem_size,
3334
                            vcpus=self.op.vcpus,
3335
                            nics=[nic], disks=disks,
3336
                            disk_template=self.op.disk_template,
3337
                            status=self.instance_status,
3338
                            network_port=network_port,
3339
                            kernel_path=self.op.kernel_path,
3340
                            initrd_path=self.op.initrd_path,
3341
                            hvm_boot_order=self.op.hvm_boot_order,
3342
                            hvm_acpi=self.op.hvm_acpi,
3343
                            hvm_pae=self.op.hvm_pae,
3344
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3345
                            vnc_bind_address=self.op.vnc_bind_address,
3346
                            hvm_nic_type=self.op.hvm_nic_type,
3347
                            hvm_disk_type=self.op.hvm_disk_type,
3348
                            )
3349

    
3350
    feedback_fn("* creating instance disks...")
3351
    if not _CreateDisks(self.cfg, iobj):
3352
      _RemoveDisks(iobj, self.cfg)
3353
      raise errors.OpExecError("Device creation failed, reverting...")
3354

    
3355
    feedback_fn("adding instance %s to cluster config" % instance)
3356

    
3357
    self.cfg.AddInstance(iobj)
3358
    # Add the new instance to the Ganeti Lock Manager
3359
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3360

    
3361
    if self.op.wait_for_sync:
3362
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3363
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3364
      # make sure the disks are not degraded (still sync-ing is ok)
3365
      time.sleep(15)
3366
      feedback_fn("* checking mirrors status")
3367
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3368
    else:
3369
      disk_abort = False
3370

    
3371
    if disk_abort:
3372
      _RemoveDisks(iobj, self.cfg)
3373
      self.cfg.RemoveInstance(iobj.name)
3374
      # Remove the new instance from the Ganeti Lock Manager
3375
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3376
      raise errors.OpExecError("There are some degraded disks for"
3377
                               " this instance")
3378

    
3379
    feedback_fn("creating os for instance %s on node %s" %
3380
                (instance, pnode_name))
3381

    
3382
    if iobj.disk_template != constants.DT_DISKLESS:
3383
      if self.op.mode == constants.INSTANCE_CREATE:
3384
        feedback_fn("* running the instance OS create scripts...")
3385
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3386
          raise errors.OpExecError("could not add os for instance %s"
3387
                                   " on node %s" %
3388
                                   (instance, pnode_name))
3389

    
3390
      elif self.op.mode == constants.INSTANCE_IMPORT:
3391
        feedback_fn("* running the instance OS import scripts...")
3392
        src_node = self.op.src_node
3393
        src_image = self.src_image
3394
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3395
                                                src_node, src_image):
3396
          raise errors.OpExecError("Could not import os for instance"
3397
                                   " %s on node %s" %
3398
                                   (instance, pnode_name))
3399
      else:
3400
        # also checked in the prereq part
3401
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3402
                                     % self.op.mode)
3403

    
3404
    if self.op.start:
3405
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3406
      feedback_fn("* starting instance...")
3407
      if not rpc.call_instance_start(pnode_name, iobj, None):
3408
        raise errors.OpExecError("Could not start instance")
3409

    
3410

    
3411
class LUConnectConsole(NoHooksLU):
3412
  """Connect to an instance's console.
3413

3414
  This is somewhat special in that it returns the command line that
3415
  you need to run on the master node in order to connect to the
3416
  console.
3417

3418
  """
3419
  _OP_REQP = ["instance_name"]
3420
  REQ_BGL = False
3421

    
3422
  def ExpandNames(self):
3423
    self._ExpandAndLockInstance()
3424

    
3425
  def CheckPrereq(self):
3426
    """Check prerequisites.
3427

3428
    This checks that the instance is in the cluster.
3429

3430
    """
3431
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3432
    assert self.instance is not None, \
3433
      "Cannot retrieve locked instance %s" % self.op.instance_name
3434

    
3435
  def Exec(self, feedback_fn):
3436
    """Connect to the console of an instance
3437

3438
    """
3439
    instance = self.instance
3440
    node = instance.primary_node
3441

    
3442
    node_insts = rpc.call_instance_list([node])[node]
3443
    if node_insts is False:
3444
      raise errors.OpExecError("Can't connect to node %s." % node)
3445

    
3446
    if instance.name not in node_insts:
3447
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3448

    
3449
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3450

    
3451
    hyper = hypervisor.GetHypervisor()
3452
    console_cmd = hyper.GetShellCommandForConsole(instance)
3453

    
3454
    # build ssh cmdline
3455
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3456

    
3457

    
3458
class LUReplaceDisks(LogicalUnit):
3459
  """Replace the disks of an instance.
3460

3461
  """
3462
  HPATH = "mirrors-replace"
3463
  HTYPE = constants.HTYPE_INSTANCE
3464
  _OP_REQP = ["instance_name", "mode", "disks"]
3465

    
3466
  def _RunAllocator(self):
3467
    """Compute a new secondary node using an IAllocator.
3468

3469
    """
3470
    ial = IAllocator(self.cfg, self.sstore,
3471
                     mode=constants.IALLOCATOR_MODE_RELOC,
3472
                     name=self.op.instance_name,
3473
                     relocate_from=[self.sec_node])
3474

    
3475
    ial.Run(self.op.iallocator)
3476

    
3477
    if not ial.success:
3478
      raise errors.OpPrereqError("Can't compute nodes using"
3479
                                 " iallocator '%s': %s" % (self.op.iallocator,
3480
                                                           ial.info))
3481
    if len(ial.nodes) != ial.required_nodes:
3482
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3483
                                 " of nodes (%s), required %s" %
3484
                                 (len(ial.nodes), ial.required_nodes))
3485
    self.op.remote_node = ial.nodes[0]
3486
    logger.ToStdout("Selected new secondary for the instance: %s" %
3487
                    self.op.remote_node)
3488

    
3489
  def BuildHooksEnv(self):
3490
    """Build hooks env.
3491

3492
    This runs on the master, the primary and all the secondaries.
3493

3494
    """
3495
    env = {
3496
      "MODE": self.op.mode,
3497
      "NEW_SECONDARY": self.op.remote_node,
3498
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3499
      }
3500
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3501
    nl = [
3502
      self.sstore.GetMasterNode(),
3503
      self.instance.primary_node,
3504
      ]
3505
    if self.op.remote_node is not None:
3506
      nl.append(self.op.remote_node)
3507
    return env, nl, nl
3508

    
3509
  def CheckPrereq(self):
3510
    """Check prerequisites.
3511

3512
    This checks that the instance is in the cluster.
3513

3514
    """
3515
    if not hasattr(self.op, "remote_node"):
3516
      self.op.remote_node = None
3517

    
3518
    instance = self.cfg.GetInstanceInfo(
3519
      self.cfg.ExpandInstanceName(self.op.instance_name))
3520
    if instance is None:
3521
      raise errors.OpPrereqError("Instance '%s' not known" %
3522
                                 self.op.instance_name)
3523
    self.instance = instance
3524
    self.op.instance_name = instance.name
3525

    
3526
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3527
      raise errors.OpPrereqError("Instance's disk layout is not"
3528
                                 " network mirrored.")
3529

    
3530
    if len(instance.secondary_nodes) != 1:
3531
      raise errors.OpPrereqError("The instance has a strange layout,"
3532
                                 " expected one secondary but found %d" %
3533
                                 len(instance.secondary_nodes))
3534

    
3535
    self.sec_node = instance.secondary_nodes[0]
3536

    
3537
    ia_name = getattr(self.op, "iallocator", None)
3538
    if ia_name is not None:
3539
      if self.op.remote_node is not None:
3540
        raise errors.OpPrereqError("Give either the iallocator or the new"
3541
                                   " secondary, not both")
3542
      self.op.remote_node = self._RunAllocator()
3543

    
3544
    remote_node = self.op.remote_node
3545
    if remote_node is not None:
3546
      remote_node = self.cfg.ExpandNodeName(remote_node)
3547
      if remote_node is None:
3548
        raise errors.OpPrereqError("Node '%s' not known" %
3549
                                   self.op.remote_node)
3550
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3551
    else:
3552
      self.remote_node_info = None
3553
    if remote_node == instance.primary_node:
3554
      raise errors.OpPrereqError("The specified node is the primary node of"
3555
                                 " the instance.")
3556
    elif remote_node == self.sec_node:
3557
      if self.op.mode == constants.REPLACE_DISK_SEC:
3558
        # this is for DRBD8, where we can't execute the same mode of
3559
        # replacement as for drbd7 (no different port allocated)
3560
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3561
                                   " replacement")
3562
    if instance.disk_template == constants.DT_DRBD8:
3563
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3564
          remote_node is not None):
3565
        # switch to replace secondary mode
3566
        self.op.mode = constants.REPLACE_DISK_SEC
3567

    
3568
      if self.op.mode == constants.REPLACE_DISK_ALL:
3569
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3570
                                   " secondary disk replacement, not"
3571
                                   " both at once")
3572
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3573
        if remote_node is not None:
3574
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3575
                                     " the secondary while doing a primary"
3576
                                     " node disk replacement")
3577
        self.tgt_node = instance.primary_node
3578
        self.oth_node = instance.secondary_nodes[0]
3579
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3580
        self.new_node = remote_node # this can be None, in which case
3581
                                    # we don't change the secondary
3582
        self.tgt_node = instance.secondary_nodes[0]
3583
        self.oth_node = instance.primary_node
3584
      else:
3585
        raise errors.ProgrammerError("Unhandled disk replace mode")
3586

    
3587
    for name in self.op.disks:
3588
      if instance.FindDisk(name) is None:
3589
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3590
                                   (name, instance.name))
3591
    self.op.remote_node = remote_node
3592

    
3593
  def _ExecD8DiskOnly(self, feedback_fn):
3594
    """Replace a disk on the primary or secondary for dbrd8.
3595

3596
    The algorithm for replace is quite complicated:
3597
      - for each disk to be replaced:
3598
        - create new LVs on the target node with unique names
3599
        - detach old LVs from the drbd device
3600
        - rename old LVs to name_replaced.<time_t>
3601
        - rename new LVs to old LVs
3602
        - attach the new LVs (with the old names now) to the drbd device
3603
      - wait for sync across all devices
3604
      - for each modified disk:
3605
        - remove old LVs (which have the name name_replaces.<time_t>)
3606

3607
    Failures are not very well handled.
3608

3609
    """
3610
    steps_total = 6
3611
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3612
    instance = self.instance
3613
    iv_names = {}
3614
    vgname = self.cfg.GetVGName()
3615
    # start of work
3616
    cfg = self.cfg
3617
    tgt_node = self.tgt_node
3618
    oth_node = self.oth_node
3619

    
3620
    # Step: check device activation
3621
    self.proc.LogStep(1, steps_total, "check device existence")
3622
    info("checking volume groups")
3623
    my_vg = cfg.GetVGName()
3624
    results = rpc.call_vg_list([oth_node, tgt_node])
3625
    if not results:
3626
      raise errors.OpExecError("Can't list volume groups on the nodes")
3627
    for node in oth_node, tgt_node:
3628
      res = results.get(node, False)
3629
      if not res or my_vg not in res:
3630
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3631
                                 (my_vg, node))
3632
    for dev in instance.disks:
3633
      if not dev.iv_name in self.op.disks:
3634
        continue
3635
      for node in tgt_node, oth_node:
3636
        info("checking %s on %s" % (dev.iv_name, node))
3637
        cfg.SetDiskID(dev, node)
3638
        if not rpc.call_blockdev_find(node, dev):
3639
          raise errors.OpExecError("Can't find device %s on node %s" %
3640
                                   (dev.iv_name, node))
3641

    
3642
    # Step: check other node consistency
3643
    self.proc.LogStep(2, steps_total, "check peer consistency")
3644
    for dev in instance.disks:
3645
      if not dev.iv_name in self.op.disks:
3646
        continue
3647
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3648
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3649
                                   oth_node==instance.primary_node):
3650
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3651
                                 " to replace disks on this node (%s)" %
3652
                                 (oth_node, tgt_node))
3653

    
3654
    # Step: create new storage
3655
    self.proc.LogStep(3, steps_total, "allocate new storage")
3656
    for dev in instance.disks:
3657
      if not dev.iv_name in self.op.disks:
3658
        continue
3659
      size = dev.size
3660
      cfg.SetDiskID(dev, tgt_node)
3661
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3662
      names = _GenerateUniqueNames(cfg, lv_names)
3663
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3664
                             logical_id=(vgname, names[0]))
3665
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3666
                             logical_id=(vgname, names[1]))
3667
      new_lvs = [lv_data, lv_meta]
3668
      old_lvs = dev.children
3669
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3670
      info("creating new local storage on %s for %s" %
3671
           (tgt_node, dev.iv_name))
3672
      # since we *always* want to create this LV, we use the
3673
      # _Create...OnPrimary (which forces the creation), even if we
3674
      # are talking about the secondary node
3675
      for new_lv in new_lvs:
3676
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3677
                                        _GetInstanceInfoText(instance)):
3678
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3679
                                   " node '%s'" %
3680
                                   (new_lv.logical_id[1], tgt_node))
3681

    
3682
    # Step: for each lv, detach+rename*2+attach
3683
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3684
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3685
      info("detaching %s drbd from local storage" % dev.iv_name)
3686
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3687
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3688
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3689
      #dev.children = []
3690
      #cfg.Update(instance)
3691

    
3692
      # ok, we created the new LVs, so now we know we have the needed
3693
      # storage; as such, we proceed on the target node to rename
3694
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3695
      # using the assumption that logical_id == physical_id (which in
3696
      # turn is the unique_id on that node)
3697

    
3698
      # FIXME(iustin): use a better name for the replaced LVs
3699
      temp_suffix = int(time.time())
3700
      ren_fn = lambda d, suff: (d.physical_id[0],
3701
                                d.physical_id[1] + "_replaced-%s" % suff)
3702
      # build the rename list based on what LVs exist on the node
3703
      rlist = []
3704
      for to_ren in old_lvs:
3705
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3706
        if find_res is not None: # device exists
3707
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3708

    
3709
      info("renaming the old LVs on the target node")
3710
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3711
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3712
      # now we rename the new LVs to the old LVs
3713
      info("renaming the new LVs on the target node")
3714
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3715
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3716
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3717

    
3718
      for old, new in zip(old_lvs, new_lvs):
3719
        new.logical_id = old.logical_id
3720
        cfg.SetDiskID(new, tgt_node)
3721

    
3722
      for disk in old_lvs:
3723
        disk.logical_id = ren_fn(disk, temp_suffix)
3724
        cfg.SetDiskID(disk, tgt_node)
3725

    
3726
      # now that the new lvs have the old name, we can add them to the device
3727
      info("adding new mirror component on %s" % tgt_node)
3728
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3729
        for new_lv in new_lvs:
3730
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3731
            warning("Can't rollback device %s", hint="manually cleanup unused"
3732
                    " logical volumes")
3733
        raise errors.OpExecError("Can't add local storage to drbd")
3734

    
3735
      dev.children = new_lvs
3736
      cfg.Update(instance)
3737

    
3738
    # Step: wait for sync
3739

    
3740
    # this can fail as the old devices are degraded and _WaitForSync
3741
    # does a combined result over all disks, so we don't check its
3742
    # return value
3743
    self.proc.LogStep(5, steps_total, "sync devices")
3744
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3745

    
3746
    # so check manually all the devices
3747
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3748
      cfg.SetDiskID(dev, instance.primary_node)
3749
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3750
      if is_degr:
3751
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3752

    
3753
    # Step: remove old storage
3754
    self.proc.LogStep(6, steps_total, "removing old storage")
3755
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3756
      info("remove logical volumes for %s" % name)
3757
      for lv in old_lvs:
3758
        cfg.SetDiskID(lv, tgt_node)
3759
        if not rpc.call_blockdev_remove(tgt_node, lv):
3760
          warning("Can't remove old LV", hint="manually remove unused LVs")
3761
          continue
3762

    
3763
  def _ExecD8Secondary(self, feedback_fn):
3764
    """Replace the secondary node for drbd8.
3765

3766
    The algorithm for replace is quite complicated:
3767
      - for all disks of the instance:
3768
        - create new LVs on the new node with same names
3769
        - shutdown the drbd device on the old secondary
3770
        - disconnect the drbd network on the primary
3771
        - create the drbd device on the new secondary
3772
        - network attach the drbd on the primary, using an artifice:
3773
          the drbd code for Attach() will connect to the network if it
3774
          finds a device which is connected to the good local disks but
3775
          not network enabled
3776
      - wait for sync across all devices
3777
      - remove all disks from the old secondary
3778

3779
    Failures are not very well handled.
3780

3781
    """
3782
    steps_total = 6
3783
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3784
    instance = self.instance
3785
    iv_names = {}
3786
    vgname = self.cfg.GetVGName()
3787
    # start of work
3788
    cfg = self.cfg
3789
    old_node = self.tgt_node
3790
    new_node = self.new_node
3791
    pri_node = instance.primary_node
3792

    
3793
    # Step: check device activation
3794
    self.proc.LogStep(1, steps_total, "check device existence")
3795
    info("checking volume groups")
3796
    my_vg = cfg.GetVGName()
3797
    results = rpc.call_vg_list([pri_node, new_node])
3798
    if not results:
3799
      raise errors.OpExecError("Can't list volume groups on the nodes")
3800
    for node in pri_node, new_node:
3801
      res = results.get(node, False)
3802
      if not res or my_vg not in res:
3803
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3804
                                 (my_vg, node))
3805
    for dev in instance.disks:
3806
      if not dev.iv_name in self.op.disks:
3807
        continue
3808
      info("checking %s on %s" % (dev.iv_name, pri_node))
3809
      cfg.SetDiskID(dev, pri_node)
3810
      if not rpc.call_blockdev_find(pri_node, dev):
3811
        raise errors.OpExecError("Can't find device %s on node %s" %
3812
                                 (dev.iv_name, pri_node))
3813

    
3814
    # Step: check other node consistency
3815
    self.proc.LogStep(2, steps_total, "check peer consistency")
3816
    for dev in instance.disks:
3817
      if not dev.iv_name in self.op.disks:
3818
        continue
3819
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3820
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3821
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3822
                                 " unsafe to replace the secondary" %
3823
                                 pri_node)
3824

    
3825
    # Step: create new storage
3826
    self.proc.LogStep(3, steps_total, "allocate new storage")
3827
    for dev in instance.disks:
3828
      size = dev.size
3829
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3830
      # since we *always* want to create this LV, we use the
3831
      # _Create...OnPrimary (which forces the creation), even if we
3832
      # are talking about the secondary node
3833
      for new_lv in dev.children:
3834
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3835
                                        _GetInstanceInfoText(instance)):
3836
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3837
                                   " node '%s'" %
3838
                                   (new_lv.logical_id[1], new_node))
3839

    
3840
      iv_names[dev.iv_name] = (dev, dev.children)
3841

    
3842
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3843
    for dev in instance.disks:
3844
      size = dev.size
3845
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3846
      # create new devices on new_node
3847
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3848
                              logical_id=(pri_node, new_node,
3849
                                          dev.logical_id[2]),
3850
                              children=dev.children)
3851
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3852
                                        new_drbd, False,
3853
                                      _GetInstanceInfoText(instance)):
3854
        raise errors.OpExecError("Failed to create new DRBD on"
3855
                                 " node '%s'" % new_node)
3856

    
3857
    for dev in instance.disks:
3858
      # we have new devices, shutdown the drbd on the old secondary
3859
      info("shutting down drbd for %s on old node" % dev.iv_name)
3860
      cfg.SetDiskID(dev, old_node)
3861
      if not rpc.call_blockdev_shutdown(old_node, dev):
3862
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3863
                hint="Please cleanup this device manually as soon as possible")
3864

    
3865
    info("detaching primary drbds from the network (=> standalone)")
3866
    done = 0
3867
    for dev in instance.disks:
3868
      cfg.SetDiskID(dev, pri_node)
3869
      # set the physical (unique in bdev terms) id to None, meaning
3870
      # detach from network
3871
      dev.physical_id = (None,) * len(dev.physical_id)
3872
      # and 'find' the device, which will 'fix' it to match the
3873
      # standalone state
3874
      if rpc.call_blockdev_find(pri_node, dev):
3875
        done += 1
3876
      else:
3877
        warning("Failed to detach drbd %s from network, unusual case" %
3878
                dev.iv_name)
3879

    
3880
    if not done:
3881
      # no detaches succeeded (very unlikely)
3882
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3883

    
3884
    # if we managed to detach at least one, we update all the disks of
3885
    # the instance to point to the new secondary
3886
    info("updating instance configuration")
3887
    for dev in instance.disks:
3888
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3889
      cfg.SetDiskID(dev, pri_node)
3890
    cfg.Update(instance)
3891

    
3892
    # and now perform the drbd attach
3893
    info("attaching primary drbds to new secondary (standalone => connected)")
3894
    failures = []
3895
    for dev in instance.disks:
3896
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3897
      # since the attach is smart, it's enough to 'find' the device,
3898
      # it will automatically activate the network, if the physical_id
3899
      # is correct
3900
      cfg.SetDiskID(dev, pri_node)
3901
      if not rpc.call_blockdev_find(pri_node, dev):
3902
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3903
                "please do a gnt-instance info to see the status of disks")
3904

    
3905
    # this can fail as the old devices are degraded and _WaitForSync
3906
    # does a combined result over all disks, so we don't check its
3907
    # return value
3908
    self.proc.LogStep(5, steps_total, "sync devices")
3909
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3910

    
3911
    # so check manually all the devices
3912
    for name, (dev, old_lvs) in iv_names.iteritems():
3913
      cfg.SetDiskID(dev, pri_node)
3914
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3915
      if is_degr:
3916
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3917

    
3918
    self.proc.LogStep(6, steps_total, "removing old storage")
3919
    for name, (dev, old_lvs) in iv_names.iteritems():
3920
      info("remove logical volumes for %s" % name)
3921
      for lv in old_lvs:
3922
        cfg.SetDiskID(lv, old_node)
3923
        if not rpc.call_blockdev_remove(old_node, lv):
3924
          warning("Can't remove LV on old secondary",
3925
                  hint="Cleanup stale volumes by hand")
3926

    
3927
  def Exec(self, feedback_fn):
3928
    """Execute disk replacement.
3929

3930
    This dispatches the disk replacement to the appropriate handler.
3931

3932
    """
3933
    instance = self.instance
3934

    
3935
    # Activate the instance disks if we're replacing them on a down instance
3936
    if instance.status == "down":
3937
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3938
      self.proc.ChainOpCode(op)
3939

    
3940
    if instance.disk_template == constants.DT_DRBD8:
3941
      if self.op.remote_node is None:
3942
        fn = self._ExecD8DiskOnly
3943
      else:
3944
        fn = self._ExecD8Secondary
3945
    else:
3946
      raise errors.ProgrammerError("Unhandled disk replacement case")
3947

    
3948
    ret = fn(feedback_fn)
3949

    
3950
    # Deactivate the instance disks if we're replacing them on a down instance
3951
    if instance.status == "down":
3952
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3953
      self.proc.ChainOpCode(op)
3954

    
3955
    return ret
3956

    
3957

    
3958
class LUGrowDisk(LogicalUnit):
3959
  """Grow a disk of an instance.
3960

3961
  """
3962
  HPATH = "disk-grow"
3963
  HTYPE = constants.HTYPE_INSTANCE
3964
  _OP_REQP = ["instance_name", "disk", "amount"]
3965

    
3966
  def BuildHooksEnv(self):
3967
    """Build hooks env.
3968

3969
    This runs on the master, the primary and all the secondaries.
3970

3971
    """
3972
    env = {
3973
      "DISK": self.op.disk,
3974
      "AMOUNT": self.op.amount,
3975
      }
3976
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3977
    nl = [
3978