Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 7eb9d8f7

History | View | Annotate | Download (173.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33

    
34
from ganeti import rpc
35
from ganeti import ssh
36
from ganeti import logger
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45

    
46

    
47
class LogicalUnit(object):
48
  """Logical Unit base class.
49

50
  Subclasses must follow these rules:
51
    - implement ExpandNames
52
    - implement CheckPrereq
53
    - implement Exec
54
    - implement BuildHooksEnv
55
    - redefine HPATH and HTYPE
56
    - optionally redefine their run requirements:
57
        REQ_MASTER: the LU needs to run on the master node
58
        REQ_WSSTORE: the LU needs a writable SimpleStore
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_WSSTORE = False
69
  REQ_BGL = True
70

    
71
  def __init__(self, processor, op, context, sstore):
72
    """Constructor for LogicalUnit.
73

74
    This needs to be overriden in derived classes in order to check op
75
    validity.
76

77
    """
78
    self.proc = processor
79
    self.op = op
80
    self.cfg = context.cfg
81
    self.sstore = sstore
82
    self.context = context
83
    self.needed_locks = None
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    # Used to force good behavior when calling helper functions
86
    self.recalculate_locks = {}
87
    self.__ssh = None
88

    
89
    for attr_name in self._OP_REQP:
90
      attr_val = getattr(op, attr_name, None)
91
      if attr_val is None:
92
        raise errors.OpPrereqError("Required parameter '%s' missing" %
93
                                   attr_name)
94

    
95
    if not self.cfg.IsCluster():
96
      raise errors.OpPrereqError("Cluster not initialized yet,"
97
                                 " use 'gnt-cluster init' first.")
98
    if self.REQ_MASTER:
99
      master = sstore.GetMasterNode()
100
      if master != utils.HostInfo().name:
101
        raise errors.OpPrereqError("Commands must be run on the master"
102
                                   " node %s" % master)
103

    
104
  def __GetSSH(self):
105
    """Returns the SshRunner object
106

107
    """
108
    if not self.__ssh:
109
      self.__ssh = ssh.SshRunner(self.sstore)
110
    return self.__ssh
111

    
112
  ssh = property(fget=__GetSSH)
113

    
114
  def ExpandNames(self):
115
    """Expand names for this LU.
116

117
    This method is called before starting to execute the opcode, and it should
118
    update all the parameters of the opcode to their canonical form (e.g. a
119
    short node name must be fully expanded after this method has successfully
120
    completed). This way locking, hooks, logging, ecc. can work correctly.
121

122
    LUs which implement this method must also populate the self.needed_locks
123
    member, as a dict with lock levels as keys, and a list of needed lock names
124
    as values. Rules:
125
      - Use an empty dict if you don't need any lock
126
      - If you don't need any lock at a particular level omit that level
127
      - Don't put anything for the BGL level
128
      - If you want all locks at a level use None as a value
129
        (this reflects what LockSet does, and will be replaced before
130
        CheckPrereq with the full list of nodes that have been locked)
131

132
    If you need to share locks (rather than acquire them exclusively) at one
133
    level you can modify self.share_locks, setting a true value (usually 1) for
134
    that level. By default locks are not shared.
135

136
    Examples:
137
    # Acquire all nodes and one instance
138
    self.needed_locks = {
139
      locking.LEVEL_NODE: None,
140
      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
141
    }
142
    # Acquire just two nodes
143
    self.needed_locks = {
144
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145
    }
146
    # Acquire no locks
147
    self.needed_locks = {} # No, you can't leave it to the default value None
148

149
    """
150
    # The implementation of this method is mandatory only if the new LU is
151
    # concurrent, so that old LUs don't need to be changed all at the same
152
    # time.
153
    if self.REQ_BGL:
154
      self.needed_locks = {} # Exclusive LUs don't need locks.
155
    else:
156
      raise NotImplementedError
157

    
158
  def DeclareLocks(self, level):
159
    """Declare LU locking needs for a level
160

161
    While most LUs can just declare their locking needs at ExpandNames time,
162
    sometimes there's the need to calculate some locks after having acquired
163
    the ones before. This function is called just before acquiring locks at a
164
    particular level, but after acquiring the ones at lower levels, and permits
165
    such calculations. It can be used to modify self.needed_locks, and by
166
    default it does nothing.
167

168
    This function is only called if you have something already set in
169
    self.needed_locks for the level.
170

171
    @param level: Locking level which is going to be locked
172
    @type level: member of ganeti.locking.LEVELS
173

174
    """
175

    
176
  def CheckPrereq(self):
177
    """Check prerequisites for this LU.
178

179
    This method should check that the prerequisites for the execution
180
    of this LU are fulfilled. It can do internode communication, but
181
    it should be idempotent - no cluster or system changes are
182
    allowed.
183

184
    The method should raise errors.OpPrereqError in case something is
185
    not fulfilled. Its return value is ignored.
186

187
    This method should also update all the parameters of the opcode to
188
    their canonical form if it hasn't been done by ExpandNames before.
189

190
    """
191
    raise NotImplementedError
192

    
193
  def Exec(self, feedback_fn):
194
    """Execute the LU.
195

196
    This method should implement the actual work. It should raise
197
    errors.OpExecError for failures that are somewhat dealt with in
198
    code, or expected.
199

200
    """
201
    raise NotImplementedError
202

    
203
  def BuildHooksEnv(self):
204
    """Build hooks environment for this LU.
205

206
    This method should return a three-node tuple consisting of: a dict
207
    containing the environment that will be used for running the
208
    specific hook for this LU, a list of node names on which the hook
209
    should run before the execution, and a list of node names on which
210
    the hook should run after the execution.
211

212
    The keys of the dict must not have 'GANETI_' prefixed as this will
213
    be handled in the hooks runner. Also note additional keys will be
214
    added by the hooks runner. If the LU doesn't define any
215
    environment, an empty dict (and not None) should be returned.
216

217
    No nodes should be returned as an empty list (and not None).
218

219
    Note that if the HPATH for a LU class is None, this function will
220
    not be called.
221

222
    """
223
    raise NotImplementedError
224

    
225
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226
    """Notify the LU about the results of its hooks.
227

228
    This method is called every time a hooks phase is executed, and notifies
229
    the Logical Unit about the hooks' result. The LU can then use it to alter
230
    its result based on the hooks.  By default the method does nothing and the
231
    previous result is passed back unchanged but any LU can define it if it
232
    wants to use the local cluster hook-scripts somehow.
233

234
    Args:
235
      phase: the hooks phase that has just been run
236
      hooks_results: the results of the multi-node hooks rpc call
237
      feedback_fn: function to send feedback back to the caller
238
      lu_result: the previous result this LU had, or None in the PRE phase.
239

240
    """
241
    return lu_result
242

    
243
  def _ExpandAndLockInstance(self):
244
    """Helper function to expand and lock an instance.
245

246
    Many LUs that work on an instance take its name in self.op.instance_name
247
    and need to expand it and then declare the expanded name for locking. This
248
    function does it, and then updates self.op.instance_name to the expanded
249
    name. It also initializes needed_locks as a dict, if this hasn't been done
250
    before.
251

252
    """
253
    if self.needed_locks is None:
254
      self.needed_locks = {}
255
    else:
256
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257
        "_ExpandAndLockInstance called with instance-level locks set"
258
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259
    if expanded_name is None:
260
      raise errors.OpPrereqError("Instance '%s' not known" %
261
                                  self.op.instance_name)
262
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263
    self.op.instance_name = expanded_name
264

    
265
  def _LockInstancesNodes(self):
266
    """Helper function to declare instances' nodes for locking.
267

268
    This function should be called after locking one or more instances to lock
269
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270
    with all primary or secondary nodes for instances already locked and
271
    present in self.needed_locks[locking.LEVEL_INSTANCE].
272

273
    It should be called from DeclareLocks, and for safety only works if
274
    self.recalculate_locks[locking.LEVEL_NODE] is set.
275

276
    In the future it may grow parameters to just lock some instance's nodes, or
277
    to just lock primaries or secondary nodes, if needed.
278

279
    If should be called in DeclareLocks in a way similar to:
280

281
    if level == locking.LEVEL_NODE:
282
      self._LockInstancesNodes()
283

284
    """
285
    assert locking.LEVEL_NODE in self.recalculate_locks, \
286
      "_LockInstancesNodes helper function called with no nodes to recalculate"
287

    
288
    # TODO: check if we're really been called with the instance locks held
289

    
290
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
291
    # future we might want to have different behaviors depending on the value
292
    # of self.recalculate_locks[locking.LEVEL_NODE]
293
    wanted_nodes = []
294
    for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
295
      instance = self.context.cfg.GetInstanceInfo(instance_name)
296
      wanted_nodes.append(instance.primary_node)
297
      wanted_nodes.extend(instance.secondary_nodes)
298
    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
299

    
300
    del self.recalculate_locks[locking.LEVEL_NODE]
301

    
302

    
303
class NoHooksLU(LogicalUnit):
304
  """Simple LU which runs no hooks.
305

306
  This LU is intended as a parent for other LogicalUnits which will
307
  run no hooks, in order to reduce duplicate code.
308

309
  """
310
  HPATH = None
311
  HTYPE = None
312

    
313

    
314
def _GetWantedNodes(lu, nodes):
315
  """Returns list of checked and expanded node names.
316

317
  Args:
318
    nodes: List of nodes (strings) or None for all
319

320
  """
321
  if not isinstance(nodes, list):
322
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
323

    
324
  if nodes:
325
    wanted = []
326

    
327
    for name in nodes:
328
      node = lu.cfg.ExpandNodeName(name)
329
      if node is None:
330
        raise errors.OpPrereqError("No such node name '%s'" % name)
331
      wanted.append(node)
332

    
333
  else:
334
    wanted = lu.cfg.GetNodeList()
335
  return utils.NiceSort(wanted)
336

    
337

    
338
def _GetWantedInstances(lu, instances):
339
  """Returns list of checked and expanded instance names.
340

341
  Args:
342
    instances: List of instances (strings) or None for all
343

344
  """
345
  if not isinstance(instances, list):
346
    raise errors.OpPrereqError("Invalid argument type 'instances'")
347

    
348
  if instances:
349
    wanted = []
350

    
351
    for name in instances:
352
      instance = lu.cfg.ExpandInstanceName(name)
353
      if instance is None:
354
        raise errors.OpPrereqError("No such instance name '%s'" % name)
355
      wanted.append(instance)
356

    
357
  else:
358
    wanted = lu.cfg.GetInstanceList()
359
  return utils.NiceSort(wanted)
360

    
361

    
362
def _CheckOutputFields(static, dynamic, selected):
363
  """Checks whether all selected fields are valid.
364

365
  Args:
366
    static: Static fields
367
    dynamic: Dynamic fields
368

369
  """
370
  static_fields = frozenset(static)
371
  dynamic_fields = frozenset(dynamic)
372

    
373
  all_fields = static_fields | dynamic_fields
374

    
375
  if not all_fields.issuperset(selected):
376
    raise errors.OpPrereqError("Unknown output fields selected: %s"
377
                               % ",".join(frozenset(selected).
378
                                          difference(all_fields)))
379

    
380

    
381
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
382
                          memory, vcpus, nics):
383
  """Builds instance related env variables for hooks from single variables.
384

385
  Args:
386
    secondary_nodes: List of secondary nodes as strings
387
  """
388
  env = {
389
    "OP_TARGET": name,
390
    "INSTANCE_NAME": name,
391
    "INSTANCE_PRIMARY": primary_node,
392
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
393
    "INSTANCE_OS_TYPE": os_type,
394
    "INSTANCE_STATUS": status,
395
    "INSTANCE_MEMORY": memory,
396
    "INSTANCE_VCPUS": vcpus,
397
  }
398

    
399
  if nics:
400
    nic_count = len(nics)
401
    for idx, (ip, bridge, mac) in enumerate(nics):
402
      if ip is None:
403
        ip = ""
404
      env["INSTANCE_NIC%d_IP" % idx] = ip
405
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
406
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
407
  else:
408
    nic_count = 0
409

    
410
  env["INSTANCE_NIC_COUNT"] = nic_count
411

    
412
  return env
413

    
414

    
415
def _BuildInstanceHookEnvByObject(instance, override=None):
416
  """Builds instance related env variables for hooks from an object.
417

418
  Args:
419
    instance: objects.Instance object of instance
420
    override: dict of values to override
421
  """
422
  args = {
423
    'name': instance.name,
424
    'primary_node': instance.primary_node,
425
    'secondary_nodes': instance.secondary_nodes,
426
    'os_type': instance.os,
427
    'status': instance.os,
428
    'memory': instance.memory,
429
    'vcpus': instance.vcpus,
430
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
431
  }
432
  if override:
433
    args.update(override)
434
  return _BuildInstanceHookEnv(**args)
435

    
436

    
437
def _CheckInstanceBridgesExist(instance):
438
  """Check that the brigdes needed by an instance exist.
439

440
  """
441
  # check bridges existance
442
  brlist = [nic.bridge for nic in instance.nics]
443
  if not rpc.call_bridges_exist(instance.primary_node, brlist):
444
    raise errors.OpPrereqError("one or more target bridges %s does not"
445
                               " exist on destination node '%s'" %
446
                               (brlist, instance.primary_node))
447

    
448

    
449
class LUDestroyCluster(NoHooksLU):
450
  """Logical unit for destroying the cluster.
451

452
  """
453
  _OP_REQP = []
454

    
455
  def CheckPrereq(self):
456
    """Check prerequisites.
457

458
    This checks whether the cluster is empty.
459

460
    Any errors are signalled by raising errors.OpPrereqError.
461

462
    """
463
    master = self.sstore.GetMasterNode()
464

    
465
    nodelist = self.cfg.GetNodeList()
466
    if len(nodelist) != 1 or nodelist[0] != master:
467
      raise errors.OpPrereqError("There are still %d node(s) in"
468
                                 " this cluster." % (len(nodelist) - 1))
469
    instancelist = self.cfg.GetInstanceList()
470
    if instancelist:
471
      raise errors.OpPrereqError("There are still %d instance(s) in"
472
                                 " this cluster." % len(instancelist))
473

    
474
  def Exec(self, feedback_fn):
475
    """Destroys the cluster.
476

477
    """
478
    master = self.sstore.GetMasterNode()
479
    if not rpc.call_node_stop_master(master, False):
480
      raise errors.OpExecError("Could not disable the master role")
481
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
482
    utils.CreateBackup(priv_key)
483
    utils.CreateBackup(pub_key)
484
    return master
485

    
486

    
487
class LUVerifyCluster(LogicalUnit):
488
  """Verifies the cluster status.
489

490
  """
491
  HPATH = "cluster-verify"
492
  HTYPE = constants.HTYPE_CLUSTER
493
  _OP_REQP = ["skip_checks"]
494

    
495
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
496
                  remote_version, feedback_fn):
497
    """Run multiple tests against a node.
498

499
    Test list:
500
      - compares ganeti version
501
      - checks vg existance and size > 20G
502
      - checks config file checksum
503
      - checks ssh to other nodes
504

505
    Args:
506
      node: name of the node to check
507
      file_list: required list of files
508
      local_cksum: dictionary of local files and their checksums
509

510
    """
511
    # compares ganeti version
512
    local_version = constants.PROTOCOL_VERSION
513
    if not remote_version:
514
      feedback_fn("  - ERROR: connection to %s failed" % (node))
515
      return True
516

    
517
    if local_version != remote_version:
518
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
519
                      (local_version, node, remote_version))
520
      return True
521

    
522
    # checks vg existance and size > 20G
523

    
524
    bad = False
525
    if not vglist:
526
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
527
                      (node,))
528
      bad = True
529
    else:
530
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
531
                                            constants.MIN_VG_SIZE)
532
      if vgstatus:
533
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
534
        bad = True
535

    
536
    # checks config file checksum
537
    # checks ssh to any
538

    
539
    if 'filelist' not in node_result:
540
      bad = True
541
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
542
    else:
543
      remote_cksum = node_result['filelist']
544
      for file_name in file_list:
545
        if file_name not in remote_cksum:
546
          bad = True
547
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
548
        elif remote_cksum[file_name] != local_cksum[file_name]:
549
          bad = True
550
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
551

    
552
    if 'nodelist' not in node_result:
553
      bad = True
554
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
555
    else:
556
      if node_result['nodelist']:
557
        bad = True
558
        for node in node_result['nodelist']:
559
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
560
                          (node, node_result['nodelist'][node]))
561
    if 'node-net-test' not in node_result:
562
      bad = True
563
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
564
    else:
565
      if node_result['node-net-test']:
566
        bad = True
567
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
568
        for node in nlist:
569
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
570
                          (node, node_result['node-net-test'][node]))
571

    
572
    hyp_result = node_result.get('hypervisor', None)
573
    if hyp_result is not None:
574
      feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
575
    return bad
576

    
577
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
578
                      node_instance, feedback_fn):
579
    """Verify an instance.
580

581
    This function checks to see if the required block devices are
582
    available on the instance's node.
583

584
    """
585
    bad = False
586

    
587
    node_current = instanceconfig.primary_node
588

    
589
    node_vol_should = {}
590
    instanceconfig.MapLVsByNode(node_vol_should)
591

    
592
    for node in node_vol_should:
593
      for volume in node_vol_should[node]:
594
        if node not in node_vol_is or volume not in node_vol_is[node]:
595
          feedback_fn("  - ERROR: volume %s missing on node %s" %
596
                          (volume, node))
597
          bad = True
598

    
599
    if not instanceconfig.status == 'down':
600
      if (node_current not in node_instance or
601
          not instance in node_instance[node_current]):
602
        feedback_fn("  - ERROR: instance %s not running on node %s" %
603
                        (instance, node_current))
604
        bad = True
605

    
606
    for node in node_instance:
607
      if (not node == node_current):
608
        if instance in node_instance[node]:
609
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
610
                          (instance, node))
611
          bad = True
612

    
613
    return bad
614

    
615
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
616
    """Verify if there are any unknown volumes in the cluster.
617

618
    The .os, .swap and backup volumes are ignored. All other volumes are
619
    reported as unknown.
620

621
    """
622
    bad = False
623

    
624
    for node in node_vol_is:
625
      for volume in node_vol_is[node]:
626
        if node not in node_vol_should or volume not in node_vol_should[node]:
627
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
628
                      (volume, node))
629
          bad = True
630
    return bad
631

    
632
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
633
    """Verify the list of running instances.
634

635
    This checks what instances are running but unknown to the cluster.
636

637
    """
638
    bad = False
639
    for node in node_instance:
640
      for runninginstance in node_instance[node]:
641
        if runninginstance not in instancelist:
642
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
643
                          (runninginstance, node))
644
          bad = True
645
    return bad
646

    
647
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
648
    """Verify N+1 Memory Resilience.
649

650
    Check that if one single node dies we can still start all the instances it
651
    was primary for.
652

653
    """
654
    bad = False
655

    
656
    for node, nodeinfo in node_info.iteritems():
657
      # This code checks that every node which is now listed as secondary has
658
      # enough memory to host all instances it is supposed to should a single
659
      # other node in the cluster fail.
660
      # FIXME: not ready for failover to an arbitrary node
661
      # FIXME: does not support file-backed instances
662
      # WARNING: we currently take into account down instances as well as up
663
      # ones, considering that even if they're down someone might want to start
664
      # them even in the event of a node failure.
665
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
666
        needed_mem = 0
667
        for instance in instances:
668
          needed_mem += instance_cfg[instance].memory
669
        if nodeinfo['mfree'] < needed_mem:
670
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
671
                      " failovers should node %s fail" % (node, prinode))
672
          bad = True
673
    return bad
674

    
675
  def CheckPrereq(self):
676
    """Check prerequisites.
677

678
    Transform the list of checks we're going to skip into a set and check that
679
    all its members are valid.
680

681
    """
682
    self.skip_set = frozenset(self.op.skip_checks)
683
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
684
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
685

    
686
  def BuildHooksEnv(self):
687
    """Build hooks env.
688

689
    Cluster-Verify hooks just rone in the post phase and their failure makes
690
    the output be logged in the verify output and the verification to fail.
691

692
    """
693
    all_nodes = self.cfg.GetNodeList()
694
    # TODO: populate the environment with useful information for verify hooks
695
    env = {}
696
    return env, [], all_nodes
697

    
698
  def Exec(self, feedback_fn):
699
    """Verify integrity of cluster, performing various test on nodes.
700

701
    """
702
    bad = False
703
    feedback_fn("* Verifying global settings")
704
    for msg in self.cfg.VerifyConfig():
705
      feedback_fn("  - ERROR: %s" % msg)
706

    
707
    vg_name = self.cfg.GetVGName()
708
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
709
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
710
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
711
    i_non_redundant = [] # Non redundant instances
712
    node_volume = {}
713
    node_instance = {}
714
    node_info = {}
715
    instance_cfg = {}
716

    
717
    # FIXME: verify OS list
718
    # do local checksums
719
    file_names = list(self.sstore.GetFileList())
720
    file_names.append(constants.SSL_CERT_FILE)
721
    file_names.append(constants.CLUSTER_CONF_FILE)
722
    local_checksums = utils.FingerprintFiles(file_names)
723

    
724
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
725
    all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
726
    all_instanceinfo = rpc.call_instance_list(nodelist)
727
    all_vglist = rpc.call_vg_list(nodelist)
728
    node_verify_param = {
729
      'filelist': file_names,
730
      'nodelist': nodelist,
731
      'hypervisor': None,
732
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
733
                        for node in nodeinfo]
734
      }
735
    all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
736
    all_rversion = rpc.call_version(nodelist)
737
    all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
738

    
739
    for node in nodelist:
740
      feedback_fn("* Verifying node %s" % node)
741
      result = self._VerifyNode(node, file_names, local_checksums,
742
                                all_vglist[node], all_nvinfo[node],
743
                                all_rversion[node], feedback_fn)
744
      bad = bad or result
745

    
746
      # node_volume
747
      volumeinfo = all_volumeinfo[node]
748

    
749
      if isinstance(volumeinfo, basestring):
750
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
751
                    (node, volumeinfo[-400:].encode('string_escape')))
752
        bad = True
753
        node_volume[node] = {}
754
      elif not isinstance(volumeinfo, dict):
755
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
756
        bad = True
757
        continue
758
      else:
759
        node_volume[node] = volumeinfo
760

    
761
      # node_instance
762
      nodeinstance = all_instanceinfo[node]
763
      if type(nodeinstance) != list:
764
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
765
        bad = True
766
        continue
767

    
768
      node_instance[node] = nodeinstance
769

    
770
      # node_info
771
      nodeinfo = all_ninfo[node]
772
      if not isinstance(nodeinfo, dict):
773
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
774
        bad = True
775
        continue
776

    
777
      try:
778
        node_info[node] = {
779
          "mfree": int(nodeinfo['memory_free']),
780
          "dfree": int(nodeinfo['vg_free']),
781
          "pinst": [],
782
          "sinst": [],
783
          # dictionary holding all instances this node is secondary for,
784
          # grouped by their primary node. Each key is a cluster node, and each
785
          # value is a list of instances which have the key as primary and the
786
          # current node as secondary.  this is handy to calculate N+1 memory
787
          # availability if you can only failover from a primary to its
788
          # secondary.
789
          "sinst-by-pnode": {},
790
        }
791
      except ValueError:
792
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
793
        bad = True
794
        continue
795

    
796
    node_vol_should = {}
797

    
798
    for instance in instancelist:
799
      feedback_fn("* Verifying instance %s" % instance)
800
      inst_config = self.cfg.GetInstanceInfo(instance)
801
      result =  self._VerifyInstance(instance, inst_config, node_volume,
802
                                     node_instance, feedback_fn)
803
      bad = bad or result
804

    
805
      inst_config.MapLVsByNode(node_vol_should)
806

    
807
      instance_cfg[instance] = inst_config
808

    
809
      pnode = inst_config.primary_node
810
      if pnode in node_info:
811
        node_info[pnode]['pinst'].append(instance)
812
      else:
813
        feedback_fn("  - ERROR: instance %s, connection to primary node"
814
                    " %s failed" % (instance, pnode))
815
        bad = True
816

    
817
      # If the instance is non-redundant we cannot survive losing its primary
818
      # node, so we are not N+1 compliant. On the other hand we have no disk
819
      # templates with more than one secondary so that situation is not well
820
      # supported either.
821
      # FIXME: does not support file-backed instances
822
      if len(inst_config.secondary_nodes) == 0:
823
        i_non_redundant.append(instance)
824
      elif len(inst_config.secondary_nodes) > 1:
825
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
826
                    % instance)
827

    
828
      for snode in inst_config.secondary_nodes:
829
        if snode in node_info:
830
          node_info[snode]['sinst'].append(instance)
831
          if pnode not in node_info[snode]['sinst-by-pnode']:
832
            node_info[snode]['sinst-by-pnode'][pnode] = []
833
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
834
        else:
835
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
836
                      " %s failed" % (instance, snode))
837

    
838
    feedback_fn("* Verifying orphan volumes")
839
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
840
                                       feedback_fn)
841
    bad = bad or result
842

    
843
    feedback_fn("* Verifying remaining instances")
844
    result = self._VerifyOrphanInstances(instancelist, node_instance,
845
                                         feedback_fn)
846
    bad = bad or result
847

    
848
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
849
      feedback_fn("* Verifying N+1 Memory redundancy")
850
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
851
      bad = bad or result
852

    
853
    feedback_fn("* Other Notes")
854
    if i_non_redundant:
855
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
856
                  % len(i_non_redundant))
857

    
858
    return not bad
859

    
860
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
861
    """Analize the post-hooks' result, handle it, and send some
862
    nicely-formatted feedback back to the user.
863

864
    Args:
865
      phase: the hooks phase that has just been run
866
      hooks_results: the results of the multi-node hooks rpc call
867
      feedback_fn: function to send feedback back to the caller
868
      lu_result: previous Exec result
869

870
    """
871
    # We only really run POST phase hooks, and are only interested in
872
    # their results
873
    if phase == constants.HOOKS_PHASE_POST:
874
      # Used to change hooks' output to proper indentation
875
      indent_re = re.compile('^', re.M)
876
      feedback_fn("* Hooks Results")
877
      if not hooks_results:
878
        feedback_fn("  - ERROR: general communication failure")
879
        lu_result = 1
880
      else:
881
        for node_name in hooks_results:
882
          show_node_header = True
883
          res = hooks_results[node_name]
884
          if res is False or not isinstance(res, list):
885
            feedback_fn("    Communication failure")
886
            lu_result = 1
887
            continue
888
          for script, hkr, output in res:
889
            if hkr == constants.HKR_FAIL:
890
              # The node header is only shown once, if there are
891
              # failing hooks on that node
892
              if show_node_header:
893
                feedback_fn("  Node %s:" % node_name)
894
                show_node_header = False
895
              feedback_fn("    ERROR: Script %s failed, output:" % script)
896
              output = indent_re.sub('      ', output)
897
              feedback_fn("%s" % output)
898
              lu_result = 1
899

    
900
      return lu_result
901

    
902

    
903
class LUVerifyDisks(NoHooksLU):
904
  """Verifies the cluster disks status.
905

906
  """
907
  _OP_REQP = []
908

    
909
  def CheckPrereq(self):
910
    """Check prerequisites.
911

912
    This has no prerequisites.
913

914
    """
915
    pass
916

    
917
  def Exec(self, feedback_fn):
918
    """Verify integrity of cluster disks.
919

920
    """
921
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
922

    
923
    vg_name = self.cfg.GetVGName()
924
    nodes = utils.NiceSort(self.cfg.GetNodeList())
925
    instances = [self.cfg.GetInstanceInfo(name)
926
                 for name in self.cfg.GetInstanceList()]
927

    
928
    nv_dict = {}
929
    for inst in instances:
930
      inst_lvs = {}
931
      if (inst.status != "up" or
932
          inst.disk_template not in constants.DTS_NET_MIRROR):
933
        continue
934
      inst.MapLVsByNode(inst_lvs)
935
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
936
      for node, vol_list in inst_lvs.iteritems():
937
        for vol in vol_list:
938
          nv_dict[(node, vol)] = inst
939

    
940
    if not nv_dict:
941
      return result
942

    
943
    node_lvs = rpc.call_volume_list(nodes, vg_name)
944

    
945
    to_act = set()
946
    for node in nodes:
947
      # node_volume
948
      lvs = node_lvs[node]
949

    
950
      if isinstance(lvs, basestring):
951
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
952
        res_nlvm[node] = lvs
953
      elif not isinstance(lvs, dict):
954
        logger.Info("connection to node %s failed or invalid data returned" %
955
                    (node,))
956
        res_nodes.append(node)
957
        continue
958

    
959
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
960
        inst = nv_dict.pop((node, lv_name), None)
961
        if (not lv_online and inst is not None
962
            and inst.name not in res_instances):
963
          res_instances.append(inst.name)
964

    
965
    # any leftover items in nv_dict are missing LVs, let's arrange the
966
    # data better
967
    for key, inst in nv_dict.iteritems():
968
      if inst.name not in res_missing:
969
        res_missing[inst.name] = []
970
      res_missing[inst.name].append(key)
971

    
972
    return result
973

    
974

    
975
class LURenameCluster(LogicalUnit):
976
  """Rename the cluster.
977

978
  """
979
  HPATH = "cluster-rename"
980
  HTYPE = constants.HTYPE_CLUSTER
981
  _OP_REQP = ["name"]
982
  REQ_WSSTORE = True
983

    
984
  def BuildHooksEnv(self):
985
    """Build hooks env.
986

987
    """
988
    env = {
989
      "OP_TARGET": self.sstore.GetClusterName(),
990
      "NEW_NAME": self.op.name,
991
      }
992
    mn = self.sstore.GetMasterNode()
993
    return env, [mn], [mn]
994

    
995
  def CheckPrereq(self):
996
    """Verify that the passed name is a valid one.
997

998
    """
999
    hostname = utils.HostInfo(self.op.name)
1000

    
1001
    new_name = hostname.name
1002
    self.ip = new_ip = hostname.ip
1003
    old_name = self.sstore.GetClusterName()
1004
    old_ip = self.sstore.GetMasterIP()
1005
    if new_name == old_name and new_ip == old_ip:
1006
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1007
                                 " cluster has changed")
1008
    if new_ip != old_ip:
1009
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1010
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1011
                                   " reachable on the network. Aborting." %
1012
                                   new_ip)
1013

    
1014
    self.op.name = new_name
1015

    
1016
  def Exec(self, feedback_fn):
1017
    """Rename the cluster.
1018

1019
    """
1020
    clustername = self.op.name
1021
    ip = self.ip
1022
    ss = self.sstore
1023

    
1024
    # shutdown the master IP
1025
    master = ss.GetMasterNode()
1026
    if not rpc.call_node_stop_master(master, False):
1027
      raise errors.OpExecError("Could not disable the master role")
1028

    
1029
    try:
1030
      # modify the sstore
1031
      ss.SetKey(ss.SS_MASTER_IP, ip)
1032
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1033

    
1034
      # Distribute updated ss config to all nodes
1035
      myself = self.cfg.GetNodeInfo(master)
1036
      dist_nodes = self.cfg.GetNodeList()
1037
      if myself.name in dist_nodes:
1038
        dist_nodes.remove(myself.name)
1039

    
1040
      logger.Debug("Copying updated ssconf data to all nodes")
1041
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1042
        fname = ss.KeyToFilename(keyname)
1043
        result = rpc.call_upload_file(dist_nodes, fname)
1044
        for to_node in dist_nodes:
1045
          if not result[to_node]:
1046
            logger.Error("copy of file %s to node %s failed" %
1047
                         (fname, to_node))
1048
    finally:
1049
      if not rpc.call_node_start_master(master, False):
1050
        logger.Error("Could not re-enable the master role on the master,"
1051
                     " please restart manually.")
1052

    
1053

    
1054
def _RecursiveCheckIfLVMBased(disk):
1055
  """Check if the given disk or its children are lvm-based.
1056

1057
  Args:
1058
    disk: ganeti.objects.Disk object
1059

1060
  Returns:
1061
    boolean indicating whether a LD_LV dev_type was found or not
1062

1063
  """
1064
  if disk.children:
1065
    for chdisk in disk.children:
1066
      if _RecursiveCheckIfLVMBased(chdisk):
1067
        return True
1068
  return disk.dev_type == constants.LD_LV
1069

    
1070

    
1071
class LUSetClusterParams(LogicalUnit):
1072
  """Change the parameters of the cluster.
1073

1074
  """
1075
  HPATH = "cluster-modify"
1076
  HTYPE = constants.HTYPE_CLUSTER
1077
  _OP_REQP = []
1078

    
1079
  def BuildHooksEnv(self):
1080
    """Build hooks env.
1081

1082
    """
1083
    env = {
1084
      "OP_TARGET": self.sstore.GetClusterName(),
1085
      "NEW_VG_NAME": self.op.vg_name,
1086
      }
1087
    mn = self.sstore.GetMasterNode()
1088
    return env, [mn], [mn]
1089

    
1090
  def CheckPrereq(self):
1091
    """Check prerequisites.
1092

1093
    This checks whether the given params don't conflict and
1094
    if the given volume group is valid.
1095

1096
    """
1097
    if not self.op.vg_name:
1098
      instances = [self.cfg.GetInstanceInfo(name)
1099
                   for name in self.cfg.GetInstanceList()]
1100
      for inst in instances:
1101
        for disk in inst.disks:
1102
          if _RecursiveCheckIfLVMBased(disk):
1103
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1104
                                       " lvm-based instances exist")
1105

    
1106
    # if vg_name not None, checks given volume group on all nodes
1107
    if self.op.vg_name:
1108
      node_list = self.cfg.GetNodeList()
1109
      vglist = rpc.call_vg_list(node_list)
1110
      for node in node_list:
1111
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1112
                                              constants.MIN_VG_SIZE)
1113
        if vgstatus:
1114
          raise errors.OpPrereqError("Error on node '%s': %s" %
1115
                                     (node, vgstatus))
1116

    
1117
  def Exec(self, feedback_fn):
1118
    """Change the parameters of the cluster.
1119

1120
    """
1121
    if self.op.vg_name != self.cfg.GetVGName():
1122
      self.cfg.SetVGName(self.op.vg_name)
1123
    else:
1124
      feedback_fn("Cluster LVM configuration already in desired"
1125
                  " state, not changing")
1126

    
1127

    
1128
def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1129
  """Sleep and poll for an instance's disk to sync.
1130

1131
  """
1132
  if not instance.disks:
1133
    return True
1134

    
1135
  if not oneshot:
1136
    proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1137

    
1138
  node = instance.primary_node
1139

    
1140
  for dev in instance.disks:
1141
    cfgw.SetDiskID(dev, node)
1142

    
1143
  retries = 0
1144
  while True:
1145
    max_time = 0
1146
    done = True
1147
    cumul_degraded = False
1148
    rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1149
    if not rstats:
1150
      proc.LogWarning("Can't get any data from node %s" % node)
1151
      retries += 1
1152
      if retries >= 10:
1153
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1154
                                 " aborting." % node)
1155
      time.sleep(6)
1156
      continue
1157
    retries = 0
1158
    for i in range(len(rstats)):
1159
      mstat = rstats[i]
1160
      if mstat is None:
1161
        proc.LogWarning("Can't compute data for node %s/%s" %
1162
                        (node, instance.disks[i].iv_name))
1163
        continue
1164
      # we ignore the ldisk parameter
1165
      perc_done, est_time, is_degraded, _ = mstat
1166
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1167
      if perc_done is not None:
1168
        done = False
1169
        if est_time is not None:
1170
          rem_time = "%d estimated seconds remaining" % est_time
1171
          max_time = est_time
1172
        else:
1173
          rem_time = "no time estimate"
1174
        proc.LogInfo("- device %s: %5.2f%% done, %s" %
1175
                     (instance.disks[i].iv_name, perc_done, rem_time))
1176
    if done or oneshot:
1177
      break
1178

    
1179
    time.sleep(min(60, max_time))
1180

    
1181
  if done:
1182
    proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1183
  return not cumul_degraded
1184

    
1185

    
1186
def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1187
  """Check that mirrors are not degraded.
1188

1189
  The ldisk parameter, if True, will change the test from the
1190
  is_degraded attribute (which represents overall non-ok status for
1191
  the device(s)) to the ldisk (representing the local storage status).
1192

1193
  """
1194
  cfgw.SetDiskID(dev, node)
1195
  if ldisk:
1196
    idx = 6
1197
  else:
1198
    idx = 5
1199

    
1200
  result = True
1201
  if on_primary or dev.AssembleOnSecondary():
1202
    rstats = rpc.call_blockdev_find(node, dev)
1203
    if not rstats:
1204
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1205
      result = False
1206
    else:
1207
      result = result and (not rstats[idx])
1208
  if dev.children:
1209
    for child in dev.children:
1210
      result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1211

    
1212
  return result
1213

    
1214

    
1215
class LUDiagnoseOS(NoHooksLU):
1216
  """Logical unit for OS diagnose/query.
1217

1218
  """
1219
  _OP_REQP = ["output_fields", "names"]
1220

    
1221
  def CheckPrereq(self):
1222
    """Check prerequisites.
1223

1224
    This always succeeds, since this is a pure query LU.
1225

1226
    """
1227
    if self.op.names:
1228
      raise errors.OpPrereqError("Selective OS query not supported")
1229

    
1230
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1231
    _CheckOutputFields(static=[],
1232
                       dynamic=self.dynamic_fields,
1233
                       selected=self.op.output_fields)
1234

    
1235
  @staticmethod
1236
  def _DiagnoseByOS(node_list, rlist):
1237
    """Remaps a per-node return list into an a per-os per-node dictionary
1238

1239
      Args:
1240
        node_list: a list with the names of all nodes
1241
        rlist: a map with node names as keys and OS objects as values
1242

1243
      Returns:
1244
        map: a map with osnames as keys and as value another map, with
1245
             nodes as
1246
             keys and list of OS objects as values
1247
             e.g. {"debian-etch": {"node1": [<object>,...],
1248
                                   "node2": [<object>,]}
1249
                  }
1250

1251
    """
1252
    all_os = {}
1253
    for node_name, nr in rlist.iteritems():
1254
      if not nr:
1255
        continue
1256
      for os_obj in nr:
1257
        if os_obj.name not in all_os:
1258
          # build a list of nodes for this os containing empty lists
1259
          # for each node in node_list
1260
          all_os[os_obj.name] = {}
1261
          for nname in node_list:
1262
            all_os[os_obj.name][nname] = []
1263
        all_os[os_obj.name][node_name].append(os_obj)
1264
    return all_os
1265

    
1266
  def Exec(self, feedback_fn):
1267
    """Compute the list of OSes.
1268

1269
    """
1270
    node_list = self.cfg.GetNodeList()
1271
    node_data = rpc.call_os_diagnose(node_list)
1272
    if node_data == False:
1273
      raise errors.OpExecError("Can't gather the list of OSes")
1274
    pol = self._DiagnoseByOS(node_list, node_data)
1275
    output = []
1276
    for os_name, os_data in pol.iteritems():
1277
      row = []
1278
      for field in self.op.output_fields:
1279
        if field == "name":
1280
          val = os_name
1281
        elif field == "valid":
1282
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1283
        elif field == "node_status":
1284
          val = {}
1285
          for node_name, nos_list in os_data.iteritems():
1286
            val[node_name] = [(v.status, v.path) for v in nos_list]
1287
        else:
1288
          raise errors.ParameterError(field)
1289
        row.append(val)
1290
      output.append(row)
1291

    
1292
    return output
1293

    
1294

    
1295
class LURemoveNode(LogicalUnit):
1296
  """Logical unit for removing a node.
1297

1298
  """
1299
  HPATH = "node-remove"
1300
  HTYPE = constants.HTYPE_NODE
1301
  _OP_REQP = ["node_name"]
1302

    
1303
  def BuildHooksEnv(self):
1304
    """Build hooks env.
1305

1306
    This doesn't run on the target node in the pre phase as a failed
1307
    node would then be impossible to remove.
1308

1309
    """
1310
    env = {
1311
      "OP_TARGET": self.op.node_name,
1312
      "NODE_NAME": self.op.node_name,
1313
      }
1314
    all_nodes = self.cfg.GetNodeList()
1315
    all_nodes.remove(self.op.node_name)
1316
    return env, all_nodes, all_nodes
1317

    
1318
  def CheckPrereq(self):
1319
    """Check prerequisites.
1320

1321
    This checks:
1322
     - the node exists in the configuration
1323
     - it does not have primary or secondary instances
1324
     - it's not the master
1325

1326
    Any errors are signalled by raising errors.OpPrereqError.
1327

1328
    """
1329
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1330
    if node is None:
1331
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1332

    
1333
    instance_list = self.cfg.GetInstanceList()
1334

    
1335
    masternode = self.sstore.GetMasterNode()
1336
    if node.name == masternode:
1337
      raise errors.OpPrereqError("Node is the master node,"
1338
                                 " you need to failover first.")
1339

    
1340
    for instance_name in instance_list:
1341
      instance = self.cfg.GetInstanceInfo(instance_name)
1342
      if node.name == instance.primary_node:
1343
        raise errors.OpPrereqError("Instance %s still running on the node,"
1344
                                   " please remove first." % instance_name)
1345
      if node.name in instance.secondary_nodes:
1346
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1347
                                   " please remove first." % instance_name)
1348
    self.op.node_name = node.name
1349
    self.node = node
1350

    
1351
  def Exec(self, feedback_fn):
1352
    """Removes the node from the cluster.
1353

1354
    """
1355
    node = self.node
1356
    logger.Info("stopping the node daemon and removing configs from node %s" %
1357
                node.name)
1358

    
1359
    self.context.RemoveNode(node.name)
1360

    
1361
    rpc.call_node_leave_cluster(node.name)
1362

    
1363

    
1364
class LUQueryNodes(NoHooksLU):
1365
  """Logical unit for querying nodes.
1366

1367
  """
1368
  _OP_REQP = ["output_fields", "names"]
1369

    
1370
  def CheckPrereq(self):
1371
    """Check prerequisites.
1372

1373
    This checks that the fields required are valid output fields.
1374

1375
    """
1376
    self.dynamic_fields = frozenset([
1377
      "dtotal", "dfree",
1378
      "mtotal", "mnode", "mfree",
1379
      "bootid",
1380
      "ctotal",
1381
      ])
1382

    
1383
    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1384
                               "pinst_list", "sinst_list",
1385
                               "pip", "sip", "tags"],
1386
                       dynamic=self.dynamic_fields,
1387
                       selected=self.op.output_fields)
1388

    
1389
    self.wanted = _GetWantedNodes(self, self.op.names)
1390

    
1391
  def Exec(self, feedback_fn):
1392
    """Computes the list of nodes and their attributes.
1393

1394
    """
1395
    nodenames = self.wanted
1396
    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1397

    
1398
    # begin data gathering
1399

    
1400
    if self.dynamic_fields.intersection(self.op.output_fields):
1401
      live_data = {}
1402
      node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1403
      for name in nodenames:
1404
        nodeinfo = node_data.get(name, None)
1405
        if nodeinfo:
1406
          live_data[name] = {
1407
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1408
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1409
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1410
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1411
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1412
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1413
            "bootid": nodeinfo['bootid'],
1414
            }
1415
        else:
1416
          live_data[name] = {}
1417
    else:
1418
      live_data = dict.fromkeys(nodenames, {})
1419

    
1420
    node_to_primary = dict([(name, set()) for name in nodenames])
1421
    node_to_secondary = dict([(name, set()) for name in nodenames])
1422

    
1423
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1424
                             "sinst_cnt", "sinst_list"))
1425
    if inst_fields & frozenset(self.op.output_fields):
1426
      instancelist = self.cfg.GetInstanceList()
1427

    
1428
      for instance_name in instancelist:
1429
        inst = self.cfg.GetInstanceInfo(instance_name)
1430
        if inst.primary_node in node_to_primary:
1431
          node_to_primary[inst.primary_node].add(inst.name)
1432
        for secnode in inst.secondary_nodes:
1433
          if secnode in node_to_secondary:
1434
            node_to_secondary[secnode].add(inst.name)
1435

    
1436
    # end data gathering
1437

    
1438
    output = []
1439
    for node in nodelist:
1440
      node_output = []
1441
      for field in self.op.output_fields:
1442
        if field == "name":
1443
          val = node.name
1444
        elif field == "pinst_list":
1445
          val = list(node_to_primary[node.name])
1446
        elif field == "sinst_list":
1447
          val = list(node_to_secondary[node.name])
1448
        elif field == "pinst_cnt":
1449
          val = len(node_to_primary[node.name])
1450
        elif field == "sinst_cnt":
1451
          val = len(node_to_secondary[node.name])
1452
        elif field == "pip":
1453
          val = node.primary_ip
1454
        elif field == "sip":
1455
          val = node.secondary_ip
1456
        elif field == "tags":
1457
          val = list(node.GetTags())
1458
        elif field in self.dynamic_fields:
1459
          val = live_data[node.name].get(field, None)
1460
        else:
1461
          raise errors.ParameterError(field)
1462
        node_output.append(val)
1463
      output.append(node_output)
1464

    
1465
    return output
1466

    
1467

    
1468
class LUQueryNodeVolumes(NoHooksLU):
1469
  """Logical unit for getting volumes on node(s).
1470

1471
  """
1472
  _OP_REQP = ["nodes", "output_fields"]
1473

    
1474
  def CheckPrereq(self):
1475
    """Check prerequisites.
1476

1477
    This checks that the fields required are valid output fields.
1478

1479
    """
1480
    self.nodes = _GetWantedNodes(self, self.op.nodes)
1481

    
1482
    _CheckOutputFields(static=["node"],
1483
                       dynamic=["phys", "vg", "name", "size", "instance"],
1484
                       selected=self.op.output_fields)
1485

    
1486

    
1487
  def Exec(self, feedback_fn):
1488
    """Computes the list of nodes and their attributes.
1489

1490
    """
1491
    nodenames = self.nodes
1492
    volumes = rpc.call_node_volumes(nodenames)
1493

    
1494
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1495
             in self.cfg.GetInstanceList()]
1496

    
1497
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1498

    
1499
    output = []
1500
    for node in nodenames:
1501
      if node not in volumes or not volumes[node]:
1502
        continue
1503

    
1504
      node_vols = volumes[node][:]
1505
      node_vols.sort(key=lambda vol: vol['dev'])
1506

    
1507
      for vol in node_vols:
1508
        node_output = []
1509
        for field in self.op.output_fields:
1510
          if field == "node":
1511
            val = node
1512
          elif field == "phys":
1513
            val = vol['dev']
1514
          elif field == "vg":
1515
            val = vol['vg']
1516
          elif field == "name":
1517
            val = vol['name']
1518
          elif field == "size":
1519
            val = int(float(vol['size']))
1520
          elif field == "instance":
1521
            for inst in ilist:
1522
              if node not in lv_by_node[inst]:
1523
                continue
1524
              if vol['name'] in lv_by_node[inst][node]:
1525
                val = inst.name
1526
                break
1527
            else:
1528
              val = '-'
1529
          else:
1530
            raise errors.ParameterError(field)
1531
          node_output.append(str(val))
1532

    
1533
        output.append(node_output)
1534

    
1535
    return output
1536

    
1537

    
1538
class LUAddNode(LogicalUnit):
1539
  """Logical unit for adding node to the cluster.
1540

1541
  """
1542
  HPATH = "node-add"
1543
  HTYPE = constants.HTYPE_NODE
1544
  _OP_REQP = ["node_name"]
1545

    
1546
  def BuildHooksEnv(self):
1547
    """Build hooks env.
1548

1549
    This will run on all nodes before, and on all nodes + the new node after.
1550

1551
    """
1552
    env = {
1553
      "OP_TARGET": self.op.node_name,
1554
      "NODE_NAME": self.op.node_name,
1555
      "NODE_PIP": self.op.primary_ip,
1556
      "NODE_SIP": self.op.secondary_ip,
1557
      }
1558
    nodes_0 = self.cfg.GetNodeList()
1559
    nodes_1 = nodes_0 + [self.op.node_name, ]
1560
    return env, nodes_0, nodes_1
1561

    
1562
  def CheckPrereq(self):
1563
    """Check prerequisites.
1564

1565
    This checks:
1566
     - the new node is not already in the config
1567
     - it is resolvable
1568
     - its parameters (single/dual homed) matches the cluster
1569

1570
    Any errors are signalled by raising errors.OpPrereqError.
1571

1572
    """
1573
    node_name = self.op.node_name
1574
    cfg = self.cfg
1575

    
1576
    dns_data = utils.HostInfo(node_name)
1577

    
1578
    node = dns_data.name
1579
    primary_ip = self.op.primary_ip = dns_data.ip
1580
    secondary_ip = getattr(self.op, "secondary_ip", None)
1581
    if secondary_ip is None:
1582
      secondary_ip = primary_ip
1583
    if not utils.IsValidIP(secondary_ip):
1584
      raise errors.OpPrereqError("Invalid secondary IP given")
1585
    self.op.secondary_ip = secondary_ip
1586

    
1587
    node_list = cfg.GetNodeList()
1588
    if not self.op.readd and node in node_list:
1589
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1590
                                 node)
1591
    elif self.op.readd and node not in node_list:
1592
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1593

    
1594
    for existing_node_name in node_list:
1595
      existing_node = cfg.GetNodeInfo(existing_node_name)
1596

    
1597
      if self.op.readd and node == existing_node_name:
1598
        if (existing_node.primary_ip != primary_ip or
1599
            existing_node.secondary_ip != secondary_ip):
1600
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1601
                                     " address configuration as before")
1602
        continue
1603

    
1604
      if (existing_node.primary_ip == primary_ip or
1605
          existing_node.secondary_ip == primary_ip or
1606
          existing_node.primary_ip == secondary_ip or
1607
          existing_node.secondary_ip == secondary_ip):
1608
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1609
                                   " existing node %s" % existing_node.name)
1610

    
1611
    # check that the type of the node (single versus dual homed) is the
1612
    # same as for the master
1613
    myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1614
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1615
    newbie_singlehomed = secondary_ip == primary_ip
1616
    if master_singlehomed != newbie_singlehomed:
1617
      if master_singlehomed:
1618
        raise errors.OpPrereqError("The master has no private ip but the"
1619
                                   " new node has one")
1620
      else:
1621
        raise errors.OpPrereqError("The master has a private ip but the"
1622
                                   " new node doesn't have one")
1623

    
1624
    # checks reachablity
1625
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1626
      raise errors.OpPrereqError("Node not reachable by ping")
1627

    
1628
    if not newbie_singlehomed:
1629
      # check reachability from my secondary ip to newbie's secondary ip
1630
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1631
                           source=myself.secondary_ip):
1632
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1633
                                   " based ping to noded port")
1634

    
1635
    self.new_node = objects.Node(name=node,
1636
                                 primary_ip=primary_ip,
1637
                                 secondary_ip=secondary_ip)
1638

    
1639
  def Exec(self, feedback_fn):
1640
    """Adds the new node to the cluster.
1641

1642
    """
1643
    new_node = self.new_node
1644
    node = new_node.name
1645

    
1646
    # check connectivity
1647
    result = rpc.call_version([node])[node]
1648
    if result:
1649
      if constants.PROTOCOL_VERSION == result:
1650
        logger.Info("communication to node %s fine, sw version %s match" %
1651
                    (node, result))
1652
      else:
1653
        raise errors.OpExecError("Version mismatch master version %s,"
1654
                                 " node version %s" %
1655
                                 (constants.PROTOCOL_VERSION, result))
1656
    else:
1657
      raise errors.OpExecError("Cannot get version from the new node")
1658

    
1659
    # setup ssh on node
1660
    logger.Info("copy ssh key to node %s" % node)
1661
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1662
    keyarray = []
1663
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1664
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1665
                priv_key, pub_key]
1666

    
1667
    for i in keyfiles:
1668
      f = open(i, 'r')
1669
      try:
1670
        keyarray.append(f.read())
1671
      finally:
1672
        f.close()
1673

    
1674
    result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1675
                               keyarray[3], keyarray[4], keyarray[5])
1676

    
1677
    if not result:
1678
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1679

    
1680
    # Add node to our /etc/hosts, and add key to known_hosts
1681
    utils.AddHostToEtcHosts(new_node.name)
1682

    
1683
    if new_node.secondary_ip != new_node.primary_ip:
1684
      if not rpc.call_node_tcp_ping(new_node.name,
1685
                                    constants.LOCALHOST_IP_ADDRESS,
1686
                                    new_node.secondary_ip,
1687
                                    constants.DEFAULT_NODED_PORT,
1688
                                    10, False):
1689
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1690
                                 " you gave (%s). Please fix and re-run this"
1691
                                 " command." % new_node.secondary_ip)
1692

    
1693
    node_verify_list = [self.sstore.GetMasterNode()]
1694
    node_verify_param = {
1695
      'nodelist': [node],
1696
      # TODO: do a node-net-test as well?
1697
    }
1698

    
1699
    result = rpc.call_node_verify(node_verify_list, node_verify_param)
1700
    for verifier in node_verify_list:
1701
      if not result[verifier]:
1702
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1703
                                 " for remote verification" % verifier)
1704
      if result[verifier]['nodelist']:
1705
        for failed in result[verifier]['nodelist']:
1706
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1707
                      (verifier, result[verifier]['nodelist'][failed]))
1708
        raise errors.OpExecError("ssh/hostname verification failed.")
1709

    
1710
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1711
    # including the node just added
1712
    myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1713
    dist_nodes = self.cfg.GetNodeList()
1714
    if not self.op.readd:
1715
      dist_nodes.append(node)
1716
    if myself.name in dist_nodes:
1717
      dist_nodes.remove(myself.name)
1718

    
1719
    logger.Debug("Copying hosts and known_hosts to all nodes")
1720
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1721
      result = rpc.call_upload_file(dist_nodes, fname)
1722
      for to_node in dist_nodes:
1723
        if not result[to_node]:
1724
          logger.Error("copy of file %s to node %s failed" %
1725
                       (fname, to_node))
1726

    
1727
    to_copy = self.sstore.GetFileList()
1728
    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1729
      to_copy.append(constants.VNC_PASSWORD_FILE)
1730
    for fname in to_copy:
1731
      result = rpc.call_upload_file([node], fname)
1732
      if not result[node]:
1733
        logger.Error("could not copy file %s to node %s" % (fname, node))
1734

    
1735
    if self.op.readd:
1736
      self.context.ReaddNode(new_node)
1737
    else:
1738
      self.context.AddNode(new_node)
1739

    
1740

    
1741
class LUQueryClusterInfo(NoHooksLU):
1742
  """Query cluster configuration.
1743

1744
  """
1745
  _OP_REQP = []
1746
  REQ_MASTER = False
1747
  REQ_BGL = False
1748

    
1749
  def ExpandNames(self):
1750
    self.needed_locks = {}
1751

    
1752
  def CheckPrereq(self):
1753
    """No prerequsites needed for this LU.
1754

1755
    """
1756
    pass
1757

    
1758
  def Exec(self, feedback_fn):
1759
    """Return cluster config.
1760

1761
    """
1762
    result = {
1763
      "name": self.sstore.GetClusterName(),
1764
      "software_version": constants.RELEASE_VERSION,
1765
      "protocol_version": constants.PROTOCOL_VERSION,
1766
      "config_version": constants.CONFIG_VERSION,
1767
      "os_api_version": constants.OS_API_VERSION,
1768
      "export_version": constants.EXPORT_VERSION,
1769
      "master": self.sstore.GetMasterNode(),
1770
      "architecture": (platform.architecture()[0], platform.machine()),
1771
      "hypervisor_type": self.sstore.GetHypervisorType(),
1772
      }
1773

    
1774
    return result
1775

    
1776

    
1777
class LUDumpClusterConfig(NoHooksLU):
1778
  """Return a text-representation of the cluster-config.
1779

1780
  """
1781
  _OP_REQP = []
1782
  REQ_BGL = False
1783

    
1784
  def ExpandNames(self):
1785
    self.needed_locks = {}
1786

    
1787
  def CheckPrereq(self):
1788
    """No prerequisites.
1789

1790
    """
1791
    pass
1792

    
1793
  def Exec(self, feedback_fn):
1794
    """Dump a representation of the cluster config to the standard output.
1795

1796
    """
1797
    return self.cfg.DumpConfig()
1798

    
1799

    
1800
class LUActivateInstanceDisks(NoHooksLU):
1801
  """Bring up an instance's disks.
1802

1803
  """
1804
  _OP_REQP = ["instance_name"]
1805

    
1806
  def CheckPrereq(self):
1807
    """Check prerequisites.
1808

1809
    This checks that the instance is in the cluster.
1810

1811
    """
1812
    instance = self.cfg.GetInstanceInfo(
1813
      self.cfg.ExpandInstanceName(self.op.instance_name))
1814
    if instance is None:
1815
      raise errors.OpPrereqError("Instance '%s' not known" %
1816
                                 self.op.instance_name)
1817
    self.instance = instance
1818

    
1819

    
1820
  def Exec(self, feedback_fn):
1821
    """Activate the disks.
1822

1823
    """
1824
    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1825
    if not disks_ok:
1826
      raise errors.OpExecError("Cannot activate block devices")
1827

    
1828
    return disks_info
1829

    
1830

    
1831
def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1832
  """Prepare the block devices for an instance.
1833

1834
  This sets up the block devices on all nodes.
1835

1836
  Args:
1837
    instance: a ganeti.objects.Instance object
1838
    ignore_secondaries: if true, errors on secondary nodes won't result
1839
                        in an error return from the function
1840

1841
  Returns:
1842
    false if the operation failed
1843
    list of (host, instance_visible_name, node_visible_name) if the operation
1844
         suceeded with the mapping from node devices to instance devices
1845
  """
1846
  device_info = []
1847
  disks_ok = True
1848
  iname = instance.name
1849
  # With the two passes mechanism we try to reduce the window of
1850
  # opportunity for the race condition of switching DRBD to primary
1851
  # before handshaking occured, but we do not eliminate it
1852

    
1853
  # The proper fix would be to wait (with some limits) until the
1854
  # connection has been made and drbd transitions from WFConnection
1855
  # into any other network-connected state (Connected, SyncTarget,
1856
  # SyncSource, etc.)
1857

    
1858
  # 1st pass, assemble on all nodes in secondary mode
1859
  for inst_disk in instance.disks:
1860
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1861
      cfg.SetDiskID(node_disk, node)
1862
      result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1863
      if not result:
1864
        logger.Error("could not prepare block device %s on node %s"
1865
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1866
        if not ignore_secondaries:
1867
          disks_ok = False
1868

    
1869
  # FIXME: race condition on drbd migration to primary
1870

    
1871
  # 2nd pass, do only the primary node
1872
  for inst_disk in instance.disks:
1873
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1874
      if node != instance.primary_node:
1875
        continue
1876
      cfg.SetDiskID(node_disk, node)
1877
      result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1878
      if not result:
1879
        logger.Error("could not prepare block device %s on node %s"
1880
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1881
        disks_ok = False
1882
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1883

    
1884
  # leave the disks configured for the primary node
1885
  # this is a workaround that would be fixed better by
1886
  # improving the logical/physical id handling
1887
  for disk in instance.disks:
1888
    cfg.SetDiskID(disk, instance.primary_node)
1889

    
1890
  return disks_ok, device_info
1891

    
1892

    
1893
def _StartInstanceDisks(cfg, instance, force):
1894
  """Start the disks of an instance.
1895

1896
  """
1897
  disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1898
                                           ignore_secondaries=force)
1899
  if not disks_ok:
1900
    _ShutdownInstanceDisks(instance, cfg)
1901
    if force is not None and not force:
1902
      logger.Error("If the message above refers to a secondary node,"
1903
                   " you can retry the operation using '--force'.")
1904
    raise errors.OpExecError("Disk consistency error")
1905

    
1906

    
1907
class LUDeactivateInstanceDisks(NoHooksLU):
1908
  """Shutdown an instance's disks.
1909

1910
  """
1911
  _OP_REQP = ["instance_name"]
1912

    
1913
  def CheckPrereq(self):
1914
    """Check prerequisites.
1915

1916
    This checks that the instance is in the cluster.
1917

1918
    """
1919
    instance = self.cfg.GetInstanceInfo(
1920
      self.cfg.ExpandInstanceName(self.op.instance_name))
1921
    if instance is None:
1922
      raise errors.OpPrereqError("Instance '%s' not known" %
1923
                                 self.op.instance_name)
1924
    self.instance = instance
1925

    
1926
  def Exec(self, feedback_fn):
1927
    """Deactivate the disks
1928

1929
    """
1930
    instance = self.instance
1931
    ins_l = rpc.call_instance_list([instance.primary_node])
1932
    ins_l = ins_l[instance.primary_node]
1933
    if not type(ins_l) is list:
1934
      raise errors.OpExecError("Can't contact node '%s'" %
1935
                               instance.primary_node)
1936

    
1937
    if self.instance.name in ins_l:
1938
      raise errors.OpExecError("Instance is running, can't shutdown"
1939
                               " block devices.")
1940

    
1941
    _ShutdownInstanceDisks(instance, self.cfg)
1942

    
1943

    
1944
def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1945
  """Shutdown block devices of an instance.
1946

1947
  This does the shutdown on all nodes of the instance.
1948

1949
  If the ignore_primary is false, errors on the primary node are
1950
  ignored.
1951

1952
  """
1953
  result = True
1954
  for disk in instance.disks:
1955
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1956
      cfg.SetDiskID(top_disk, node)
1957
      if not rpc.call_blockdev_shutdown(node, top_disk):
1958
        logger.Error("could not shutdown block device %s on node %s" %
1959
                     (disk.iv_name, node))
1960
        if not ignore_primary or node != instance.primary_node:
1961
          result = False
1962
  return result
1963

    
1964

    
1965
def _CheckNodeFreeMemory(cfg, node, reason, requested):
1966
  """Checks if a node has enough free memory.
1967

1968
  This function check if a given node has the needed amount of free
1969
  memory. In case the node has less memory or we cannot get the
1970
  information from the node, this function raise an OpPrereqError
1971
  exception.
1972

1973
  Args:
1974
    - cfg: a ConfigWriter instance
1975
    - node: the node name
1976
    - reason: string to use in the error message
1977
    - requested: the amount of memory in MiB
1978

1979
  """
1980
  nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1981
  if not nodeinfo or not isinstance(nodeinfo, dict):
1982
    raise errors.OpPrereqError("Could not contact node %s for resource"
1983
                             " information" % (node,))
1984

    
1985
  free_mem = nodeinfo[node].get('memory_free')
1986
  if not isinstance(free_mem, int):
1987
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1988
                             " was '%s'" % (node, free_mem))
1989
  if requested > free_mem:
1990
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1991
                             " needed %s MiB, available %s MiB" %
1992
                             (node, reason, requested, free_mem))
1993

    
1994

    
1995
class LUStartupInstance(LogicalUnit):
1996
  """Starts an instance.
1997

1998
  """
1999
  HPATH = "instance-start"
2000
  HTYPE = constants.HTYPE_INSTANCE
2001
  _OP_REQP = ["instance_name", "force"]
2002
  REQ_BGL = False
2003

    
2004
  def ExpandNames(self):
2005
    self._ExpandAndLockInstance()
2006
    self.needed_locks[locking.LEVEL_NODE] = []
2007
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2008

    
2009
  def DeclareLocks(self, level):
2010
    if level == locking.LEVEL_NODE:
2011
      self._LockInstancesNodes()
2012

    
2013
  def BuildHooksEnv(self):
2014
    """Build hooks env.
2015

2016
    This runs on master, primary and secondary nodes of the instance.
2017

2018
    """
2019
    env = {
2020
      "FORCE": self.op.force,
2021
      }
2022
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2023
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2024
          list(self.instance.secondary_nodes))
2025
    return env, nl, nl
2026

    
2027
  def CheckPrereq(self):
2028
    """Check prerequisites.
2029

2030
    This checks that the instance is in the cluster.
2031

2032
    """
2033
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2034
    assert self.instance is not None, \
2035
      "Cannot retrieve locked instance %s" % self.op.instance_name
2036

    
2037
    # check bridges existance
2038
    _CheckInstanceBridgesExist(instance)
2039

    
2040
    _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2041
                         "starting instance %s" % instance.name,
2042
                         instance.memory)
2043

    
2044
  def Exec(self, feedback_fn):
2045
    """Start the instance.
2046

2047
    """
2048
    instance = self.instance
2049
    force = self.op.force
2050
    extra_args = getattr(self.op, "extra_args", "")
2051

    
2052
    self.cfg.MarkInstanceUp(instance.name)
2053

    
2054
    node_current = instance.primary_node
2055

    
2056
    _StartInstanceDisks(self.cfg, instance, force)
2057

    
2058
    if not rpc.call_instance_start(node_current, instance, extra_args):
2059
      _ShutdownInstanceDisks(instance, self.cfg)
2060
      raise errors.OpExecError("Could not start instance")
2061

    
2062

    
2063
class LURebootInstance(LogicalUnit):
2064
  """Reboot an instance.
2065

2066
  """
2067
  HPATH = "instance-reboot"
2068
  HTYPE = constants.HTYPE_INSTANCE
2069
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2070
  REQ_BGL = False
2071

    
2072
  def ExpandNames(self):
2073
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2074
                                   constants.INSTANCE_REBOOT_HARD,
2075
                                   constants.INSTANCE_REBOOT_FULL]:
2076
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2077
                                  (constants.INSTANCE_REBOOT_SOFT,
2078
                                   constants.INSTANCE_REBOOT_HARD,
2079
                                   constants.INSTANCE_REBOOT_FULL))
2080
    self._ExpandAndLockInstance()
2081
    self.needed_locks[locking.LEVEL_NODE] = []
2082
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2083

    
2084
  def DeclareLocks(self, level):
2085
    if level == locking.LEVEL_NODE:
2086
      # FIXME: lock only primary on (not constants.INSTANCE_REBOOT_FULL)
2087
      self._LockInstancesNodes()
2088

    
2089
  def BuildHooksEnv(self):
2090
    """Build hooks env.
2091

2092
    This runs on master, primary and secondary nodes of the instance.
2093

2094
    """
2095
    env = {
2096
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2097
      }
2098
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2099
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2100
          list(self.instance.secondary_nodes))
2101
    return env, nl, nl
2102

    
2103
  def CheckPrereq(self):
2104
    """Check prerequisites.
2105

2106
    This checks that the instance is in the cluster.
2107

2108
    """
2109
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2110
    assert self.instance is not None, \
2111
      "Cannot retrieve locked instance %s" % self.op.instance_name
2112

    
2113
    # check bridges existance
2114
    _CheckInstanceBridgesExist(instance)
2115

    
2116
  def Exec(self, feedback_fn):
2117
    """Reboot the instance.
2118

2119
    """
2120
    instance = self.instance
2121
    ignore_secondaries = self.op.ignore_secondaries
2122
    reboot_type = self.op.reboot_type
2123
    extra_args = getattr(self.op, "extra_args", "")
2124

    
2125
    node_current = instance.primary_node
2126

    
2127
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2128
                       constants.INSTANCE_REBOOT_HARD]:
2129
      if not rpc.call_instance_reboot(node_current, instance,
2130
                                      reboot_type, extra_args):
2131
        raise errors.OpExecError("Could not reboot instance")
2132
    else:
2133
      if not rpc.call_instance_shutdown(node_current, instance):
2134
        raise errors.OpExecError("could not shutdown instance for full reboot")
2135
      _ShutdownInstanceDisks(instance, self.cfg)
2136
      _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2137
      if not rpc.call_instance_start(node_current, instance, extra_args):
2138
        _ShutdownInstanceDisks(instance, self.cfg)
2139
        raise errors.OpExecError("Could not start instance for full reboot")
2140

    
2141
    self.cfg.MarkInstanceUp(instance.name)
2142

    
2143

    
2144
class LUShutdownInstance(LogicalUnit):
2145
  """Shutdown an instance.
2146

2147
  """
2148
  HPATH = "instance-stop"
2149
  HTYPE = constants.HTYPE_INSTANCE
2150
  _OP_REQP = ["instance_name"]
2151
  REQ_BGL = False
2152

    
2153
  def ExpandNames(self):
2154
    self._ExpandAndLockInstance()
2155
    self.needed_locks[locking.LEVEL_NODE] = []
2156
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2157

    
2158
  def DeclareLocks(self, level):
2159
    if level == locking.LEVEL_NODE:
2160
      self._LockInstancesNodes()
2161

    
2162
  def BuildHooksEnv(self):
2163
    """Build hooks env.
2164

2165
    This runs on master, primary and secondary nodes of the instance.
2166

2167
    """
2168
    env = _BuildInstanceHookEnvByObject(self.instance)
2169
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2170
          list(self.instance.secondary_nodes))
2171
    return env, nl, nl
2172

    
2173
  def CheckPrereq(self):
2174
    """Check prerequisites.
2175

2176
    This checks that the instance is in the cluster.
2177

2178
    """
2179
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2180
    assert self.instance is not None, \
2181
      "Cannot retrieve locked instance %s" % self.op.instance_name
2182

    
2183
  def Exec(self, feedback_fn):
2184
    """Shutdown the instance.
2185

2186
    """
2187
    instance = self.instance
2188
    node_current = instance.primary_node
2189
    self.cfg.MarkInstanceDown(instance.name)
2190
    if not rpc.call_instance_shutdown(node_current, instance):
2191
      logger.Error("could not shutdown instance")
2192

    
2193
    _ShutdownInstanceDisks(instance, self.cfg)
2194

    
2195

    
2196
class LUReinstallInstance(LogicalUnit):
2197
  """Reinstall an instance.
2198

2199
  """
2200
  HPATH = "instance-reinstall"
2201
  HTYPE = constants.HTYPE_INSTANCE
2202
  _OP_REQP = ["instance_name"]
2203
  REQ_BGL = False
2204

    
2205
  def ExpandNames(self):
2206
    self._ExpandAndLockInstance()
2207
    self.needed_locks[locking.LEVEL_NODE] = []
2208
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2209

    
2210
  def DeclareLocks(self, level):
2211
    if level == locking.LEVEL_NODE:
2212
      self._LockInstancesNodes()
2213

    
2214
  def BuildHooksEnv(self):
2215
    """Build hooks env.
2216

2217
    This runs on master, primary and secondary nodes of the instance.
2218

2219
    """
2220
    env = _BuildInstanceHookEnvByObject(self.instance)
2221
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2222
          list(self.instance.secondary_nodes))
2223
    return env, nl, nl
2224

    
2225
  def CheckPrereq(self):
2226
    """Check prerequisites.
2227

2228
    This checks that the instance is in the cluster and is not running.
2229

2230
    """
2231
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2232
    assert instance is not None, \
2233
      "Cannot retrieve locked instance %s" % self.op.instance_name
2234

    
2235
    if instance.disk_template == constants.DT_DISKLESS:
2236
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2237
                                 self.op.instance_name)
2238
    if instance.status != "down":
2239
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2240
                                 self.op.instance_name)
2241
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2242
    if remote_info:
2243
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2244
                                 (self.op.instance_name,
2245
                                  instance.primary_node))
2246

    
2247
    self.op.os_type = getattr(self.op, "os_type", None)
2248
    if self.op.os_type is not None:
2249
      # OS verification
2250
      pnode = self.cfg.GetNodeInfo(
2251
        self.cfg.ExpandNodeName(instance.primary_node))
2252
      if pnode is None:
2253
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2254
                                   self.op.pnode)
2255
      os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2256
      if not os_obj:
2257
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2258
                                   " primary node"  % self.op.os_type)
2259

    
2260
    self.instance = instance
2261

    
2262
  def Exec(self, feedback_fn):
2263
    """Reinstall the instance.
2264

2265
    """
2266
    inst = self.instance
2267

    
2268
    if self.op.os_type is not None:
2269
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2270
      inst.os = self.op.os_type
2271
      self.cfg.AddInstance(inst)
2272

    
2273
    _StartInstanceDisks(self.cfg, inst, None)
2274
    try:
2275
      feedback_fn("Running the instance OS create scripts...")
2276
      if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2277
        raise errors.OpExecError("Could not install OS for instance %s"
2278
                                 " on node %s" %
2279
                                 (inst.name, inst.primary_node))
2280
    finally:
2281
      _ShutdownInstanceDisks(inst, self.cfg)
2282

    
2283

    
2284
class LURenameInstance(LogicalUnit):
2285
  """Rename an instance.
2286

2287
  """
2288
  HPATH = "instance-rename"
2289
  HTYPE = constants.HTYPE_INSTANCE
2290
  _OP_REQP = ["instance_name", "new_name"]
2291

    
2292
  def BuildHooksEnv(self):
2293
    """Build hooks env.
2294

2295
    This runs on master, primary and secondary nodes of the instance.
2296

2297
    """
2298
    env = _BuildInstanceHookEnvByObject(self.instance)
2299
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2300
    nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2301
          list(self.instance.secondary_nodes))
2302
    return env, nl, nl
2303

    
2304
  def CheckPrereq(self):
2305
    """Check prerequisites.
2306

2307
    This checks that the instance is in the cluster and is not running.
2308

2309
    """
2310
    instance = self.cfg.GetInstanceInfo(
2311
      self.cfg.ExpandInstanceName(self.op.instance_name))
2312
    if instance is None:
2313
      raise errors.OpPrereqError("Instance '%s' not known" %
2314
                                 self.op.instance_name)
2315
    if instance.status != "down":
2316
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2317
                                 self.op.instance_name)
2318
    remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2319
    if remote_info:
2320
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2321
                                 (self.op.instance_name,
2322
                                  instance.primary_node))
2323
    self.instance = instance
2324

    
2325
    # new name verification
2326
    name_info = utils.HostInfo(self.op.new_name)
2327

    
2328
    self.op.new_name = new_name = name_info.name
2329
    instance_list = self.cfg.GetInstanceList()
2330
    if new_name in instance_list:
2331
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2332
                                 new_name)
2333

    
2334
    if not getattr(self.op, "ignore_ip", False):
2335
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2336
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2337
                                   (name_info.ip, new_name))
2338

    
2339

    
2340
  def Exec(self, feedback_fn):
2341
    """Reinstall the instance.
2342

2343
    """
2344
    inst = self.instance
2345
    old_name = inst.name
2346

    
2347
    if inst.disk_template == constants.DT_FILE:
2348
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2349

    
2350
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2351
    # Change the instance lock. This is definitely safe while we hold the BGL
2352
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2353
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2354

    
2355
    # re-read the instance from the configuration after rename
2356
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2357

    
2358
    if inst.disk_template == constants.DT_FILE:
2359
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2360
      result = rpc.call_file_storage_dir_rename(inst.primary_node,
2361
                                                old_file_storage_dir,
2362
                                                new_file_storage_dir)
2363

    
2364
      if not result:
2365
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2366
                                 " directory '%s' to '%s' (but the instance"
2367
                                 " has been renamed in Ganeti)" % (
2368
                                 inst.primary_node, old_file_storage_dir,
2369
                                 new_file_storage_dir))
2370

    
2371
      if not result[0]:
2372
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2373
                                 " (but the instance has been renamed in"
2374
                                 " Ganeti)" % (old_file_storage_dir,
2375
                                               new_file_storage_dir))
2376

    
2377
    _StartInstanceDisks(self.cfg, inst, None)
2378
    try:
2379
      if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2380
                                          "sda", "sdb"):
2381
        msg = ("Could run OS rename script for instance %s on node %s (but the"
2382
               " instance has been renamed in Ganeti)" %
2383
               (inst.name, inst.primary_node))
2384
        logger.Error(msg)
2385
    finally:
2386
      _ShutdownInstanceDisks(inst, self.cfg)
2387

    
2388

    
2389
class LURemoveInstance(LogicalUnit):
2390
  """Remove an instance.
2391

2392
  """
2393
  HPATH = "instance-remove"
2394
  HTYPE = constants.HTYPE_INSTANCE
2395
  _OP_REQP = ["instance_name", "ignore_failures"]
2396

    
2397
  def BuildHooksEnv(self):
2398
    """Build hooks env.
2399

2400
    This runs on master, primary and secondary nodes of the instance.
2401

2402
    """
2403
    env = _BuildInstanceHookEnvByObject(self.instance)
2404
    nl = [self.sstore.GetMasterNode()]
2405
    return env, nl, nl
2406

    
2407
  def CheckPrereq(self):
2408
    """Check prerequisites.
2409

2410
    This checks that the instance is in the cluster.
2411

2412
    """
2413
    instance = self.cfg.GetInstanceInfo(
2414
      self.cfg.ExpandInstanceName(self.op.instance_name))
2415
    if instance is None:
2416
      raise errors.OpPrereqError("Instance '%s' not known" %
2417
                                 self.op.instance_name)
2418
    self.instance = instance
2419

    
2420
  def Exec(self, feedback_fn):
2421
    """Remove the instance.
2422

2423
    """
2424
    instance = self.instance
2425
    logger.Info("shutting down instance %s on node %s" %
2426
                (instance.name, instance.primary_node))
2427

    
2428
    if not rpc.call_instance_shutdown(instance.primary_node, instance):
2429
      if self.op.ignore_failures:
2430
        feedback_fn("Warning: can't shutdown instance")
2431
      else:
2432
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2433
                                 (instance.name, instance.primary_node))
2434

    
2435
    logger.Info("removing block devices for instance %s" % instance.name)
2436

    
2437
    if not _RemoveDisks(instance, self.cfg):
2438
      if self.op.ignore_failures:
2439
        feedback_fn("Warning: can't remove instance's disks")
2440
      else:
2441
        raise errors.OpExecError("Can't remove instance's disks")
2442

    
2443
    logger.Info("removing instance %s out of cluster config" % instance.name)
2444

    
2445
    self.cfg.RemoveInstance(instance.name)
2446
    # Remove the new instance from the Ganeti Lock Manager
2447
    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2448

    
2449

    
2450
class LUQueryInstances(NoHooksLU):
2451
  """Logical unit for querying instances.
2452

2453
  """
2454
  _OP_REQP = ["output_fields", "names"]
2455
  REQ_BGL = False
2456

    
2457
  def ExpandNames(self):
2458
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2459
    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2460
                               "admin_state", "admin_ram",
2461
                               "disk_template", "ip", "mac", "bridge",
2462
                               "sda_size", "sdb_size", "vcpus", "tags"],
2463
                       dynamic=self.dynamic_fields,
2464
                       selected=self.op.output_fields)
2465

    
2466
    self.needed_locks = {}
2467
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2468
    self.share_locks[locking.LEVEL_NODE] = 1
2469

    
2470
    # TODO: we could lock instances (and nodes) only if the user asked for
2471
    # dynamic fields. For that we need atomic ways to get info for a group of
2472
    # instances from the config, though.
2473
    if not self.op.names:
2474
      self.needed_locks[locking.LEVEL_INSTANCE] = None # Acquire all
2475
    else:
2476
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2477
        _GetWantedInstances(self, self.op.names)
2478

    
2479
    self.needed_locks[locking.LEVEL_NODE] = []
2480
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2481

    
2482
  def DeclareLocks(self, level):
2483
    # TODO: locking of nodes could be avoided when not querying them
2484
    if level == locking.LEVEL_NODE:
2485
      self._LockInstancesNodes()
2486

    
2487
  def CheckPrereq(self):
2488
    """Check prerequisites.
2489

2490
    """
2491
    # This of course is valid only if we locked the instances
2492
    self.wanted = self.needed_locks[locking.LEVEL_INSTANCE]
2493

    
2494
  def Exec(self, feedback_fn):
2495
    """Computes the list of nodes and their attributes.
2496

2497
    """
2498
    instance_names = self.wanted
2499
    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2500
                     in instance_names]
2501

    
2502
    # begin data gathering
2503

    
2504
    nodes = frozenset([inst.primary_node for inst in instance_list])
2505

    
2506
    bad_nodes = []
2507
    if self.dynamic_fields.intersection(self.op.output_fields):
2508
      live_data = {}
2509
      node_data = rpc.call_all_instances_info(nodes)
2510
      for name in nodes:
2511
        result = node_data[name]
2512
        if result:
2513
          live_data.update(result)
2514
        elif result == False:
2515
          bad_nodes.append(name)
2516
        # else no instance is alive
2517
    else:
2518
      live_data = dict([(name, {}) for name in instance_names])
2519

    
2520
    # end data gathering
2521

    
2522
    output = []
2523
    for instance in instance_list:
2524
      iout = []
2525
      for field in self.op.output_fields:
2526
        if field == "name":
2527
          val = instance.name
2528
        elif field == "os":
2529
          val = instance.os
2530
        elif field == "pnode":
2531
          val = instance.primary_node
2532
        elif field == "snodes":
2533
          val = list(instance.secondary_nodes)
2534
        elif field == "admin_state":
2535
          val = (instance.status != "down")
2536
        elif field == "oper_state":
2537
          if instance.primary_node in bad_nodes:
2538
            val = None
2539
          else:
2540
            val = bool(live_data.get(instance.name))
2541
        elif field == "status":
2542
          if instance.primary_node in bad_nodes:
2543
            val = "ERROR_nodedown"
2544
          else:
2545
            running = bool(live_data.get(instance.name))
2546
            if running:
2547
              if instance.status != "down":
2548
                val = "running"
2549
              else:
2550
                val = "ERROR_up"
2551
            else:
2552
              if instance.status != "down":
2553
                val = "ERROR_down"
2554
              else:
2555
                val = "ADMIN_down"
2556
        elif field == "admin_ram":
2557
          val = instance.memory
2558
        elif field == "oper_ram":
2559
          if instance.primary_node in bad_nodes:
2560
            val = None
2561
          elif instance.name in live_data:
2562
            val = live_data[instance.name].get("memory", "?")
2563
          else:
2564
            val = "-"
2565
        elif field == "disk_template":
2566
          val = instance.disk_template
2567
        elif field == "ip":
2568
          val = instance.nics[0].ip
2569
        elif field == "bridge":
2570
          val = instance.nics[0].bridge
2571
        elif field == "mac":
2572
          val = instance.nics[0].mac
2573
        elif field == "sda_size" or field == "sdb_size":
2574
          disk = instance.FindDisk(field[:3])
2575
          if disk is None:
2576
            val = None
2577
          else:
2578
            val = disk.size
2579
        elif field == "vcpus":
2580
          val = instance.vcpus
2581
        elif field == "tags":
2582
          val = list(instance.GetTags())
2583
        else:
2584
          raise errors.ParameterError(field)
2585
        iout.append(val)
2586
      output.append(iout)
2587

    
2588
    return output
2589

    
2590

    
2591
class LUFailoverInstance(LogicalUnit):
2592
  """Failover an instance.
2593

2594
  """
2595
  HPATH = "instance-failover"
2596
  HTYPE = constants.HTYPE_INSTANCE
2597
  _OP_REQP = ["instance_name", "ignore_consistency"]
2598
  REQ_BGL = False
2599

    
2600
  def ExpandNames(self):
2601
    self._ExpandAndLockInstance()
2602
    self.needed_locks[locking.LEVEL_NODE] = []
2603
    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2604

    
2605
  def DeclareLocks(self, level):
2606
    if level == locking.LEVEL_NODE:
2607
      self._LockInstancesNodes()
2608

    
2609
  def BuildHooksEnv(self):
2610
    """Build hooks env.
2611

2612
    This runs on master, primary and secondary nodes of the instance.
2613

2614
    """
2615
    env = {
2616
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2617
      }
2618
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2619
    nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2620
    return env, nl, nl
2621

    
2622
  def CheckPrereq(self):
2623
    """Check prerequisites.
2624

2625
    This checks that the instance is in the cluster.
2626

2627
    """
2628
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2629
    assert self.instance is not None, \
2630
      "Cannot retrieve locked instance %s" % self.op.instance_name
2631

    
2632
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2633
      raise errors.OpPrereqError("Instance's disk layout is not"
2634
                                 " network mirrored, cannot failover.")
2635

    
2636
    secondary_nodes = instance.secondary_nodes
2637
    if not secondary_nodes:
2638
      raise errors.ProgrammerError("no secondary node but using "
2639
                                   "a mirrored disk template")
2640

    
2641
    target_node = secondary_nodes[0]
2642
    # check memory requirements on the secondary node
2643
    _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2644
                         instance.name, instance.memory)
2645

    
2646
    # check bridge existance
2647
    brlist = [nic.bridge for nic in instance.nics]
2648
    if not rpc.call_bridges_exist(target_node, brlist):
2649
      raise errors.OpPrereqError("One or more target bridges %s does not"
2650
                                 " exist on destination node '%s'" %
2651
                                 (brlist, target_node))
2652

    
2653
  def Exec(self, feedback_fn):
2654
    """Failover an instance.
2655

2656
    The failover is done by shutting it down on its present node and
2657
    starting it on the secondary.
2658

2659
    """
2660
    instance = self.instance
2661

    
2662
    source_node = instance.primary_node
2663
    target_node = instance.secondary_nodes[0]
2664

    
2665
    feedback_fn("* checking disk consistency between source and target")
2666
    for dev in instance.disks:
2667
      # for drbd, these are drbd over lvm
2668
      if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2669
        if instance.status == "up" and not self.op.ignore_consistency:
2670
          raise errors.OpExecError("Disk %s is degraded on target node,"
2671
                                   " aborting failover." % dev.iv_name)
2672

    
2673
    feedback_fn("* shutting down instance on source node")
2674
    logger.Info("Shutting down instance %s on node %s" %
2675
                (instance.name, source_node))
2676

    
2677
    if not rpc.call_instance_shutdown(source_node, instance):
2678
      if self.op.ignore_consistency:
2679
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2680
                     " anyway. Please make sure node %s is down"  %
2681
                     (instance.name, source_node, source_node))
2682
      else:
2683
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2684
                                 (instance.name, source_node))
2685

    
2686
    feedback_fn("* deactivating the instance's disks on source node")
2687
    if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2688
      raise errors.OpExecError("Can't shut down the instance's disks.")
2689

    
2690
    instance.primary_node = target_node
2691
    # distribute new instance config to the other nodes
2692
    self.cfg.Update(instance)
2693

    
2694
    # Only start the instance if it's marked as up
2695
    if instance.status == "up":
2696
      feedback_fn("* activating the instance's disks on target node")
2697
      logger.Info("Starting instance %s on node %s" %
2698
                  (instance.name, target_node))
2699

    
2700
      disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2701
                                               ignore_secondaries=True)
2702
      if not disks_ok:
2703
        _ShutdownInstanceDisks(instance, self.cfg)
2704
        raise errors.OpExecError("Can't activate the instance's disks")
2705

    
2706
      feedback_fn("* starting the instance on the target node")
2707
      if not rpc.call_instance_start(target_node, instance, None):
2708
        _ShutdownInstanceDisks(instance, self.cfg)
2709
        raise errors.OpExecError("Could not start instance %s on node %s." %
2710
                                 (instance.name, target_node))
2711

    
2712

    
2713
def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2714
  """Create a tree of block devices on the primary node.
2715

2716
  This always creates all devices.
2717

2718
  """
2719
  if device.children:
2720
    for child in device.children:
2721
      if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2722
        return False
2723

    
2724
  cfg.SetDiskID(device, node)
2725
  new_id = rpc.call_blockdev_create(node, device, device.size,
2726
                                    instance.name, True, info)
2727
  if not new_id:
2728
    return False
2729
  if device.physical_id is None:
2730
    device.physical_id = new_id
2731
  return True
2732

    
2733

    
2734
def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2735
  """Create a tree of block devices on a secondary node.
2736

2737
  If this device type has to be created on secondaries, create it and
2738
  all its children.
2739

2740
  If not, just recurse to children keeping the same 'force' value.
2741

2742
  """
2743
  if device.CreateOnSecondary():
2744
    force = True
2745
  if device.children:
2746
    for child in device.children:
2747
      if not _CreateBlockDevOnSecondary(cfg, node, instance,
2748
                                        child, force, info):
2749
        return False
2750

    
2751
  if not force:
2752
    return True
2753
  cfg.SetDiskID(device, node)
2754
  new_id = rpc.call_blockdev_create(node, device, device.size,
2755
                                    instance.name, False, info)
2756
  if not new_id:
2757
    return False
2758
  if device.physical_id is None:
2759
    device.physical_id = new_id
2760
  return True
2761

    
2762

    
2763
def _GenerateUniqueNames(cfg, exts):
2764
  """Generate a suitable LV name.
2765

2766
  This will generate a logical volume name for the given instance.
2767

2768
  """
2769
  results = []
2770
  for val in exts:
2771
    new_id = cfg.GenerateUniqueID()
2772
    results.append("%s%s" % (new_id, val))
2773
  return results
2774

    
2775

    
2776
def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2777
  """Generate a drbd8 device complete with its children.
2778

2779
  """
2780
  port = cfg.AllocatePort()
2781
  vgname = cfg.GetVGName()
2782
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2783
                          logical_id=(vgname, names[0]))
2784
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2785
                          logical_id=(vgname, names[1]))
2786
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2787
                          logical_id = (primary, secondary, port),
2788
                          children = [dev_data, dev_meta],
2789
                          iv_name=iv_name)
2790
  return drbd_dev
2791

    
2792

    
2793
def _GenerateDiskTemplate(cfg, template_name,
2794
                          instance_name, primary_node,
2795
                          secondary_nodes, disk_sz, swap_sz,
2796
                          file_storage_dir, file_driver):
2797
  """Generate the entire disk layout for a given template type.
2798

2799
  """
2800
  #TODO: compute space requirements
2801

    
2802
  vgname = cfg.GetVGName()
2803
  if template_name == constants.DT_DISKLESS:
2804
    disks = []
2805
  elif template_name == constants.DT_PLAIN:
2806
    if len(secondary_nodes) != 0:
2807
      raise errors.ProgrammerError("Wrong template configuration")
2808

    
2809
    names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2810
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2811
                           logical_id=(vgname, names[0]),
2812
                           iv_name = "sda")
2813
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2814
                           logical_id=(vgname, names[1]),
2815
                           iv_name = "sdb")
2816
    disks = [sda_dev, sdb_dev]
2817
  elif template_name == constants.DT_DRBD8:
2818
    if len(secondary_nodes) != 1:
2819
      raise errors.ProgrammerError("Wrong template configuration")
2820
    remote_node = secondary_nodes[0]
2821
    names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2822
                                       ".sdb_data", ".sdb_meta"])
2823
    drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2824
                                         disk_sz, names[0:2], "sda")
2825
    drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2826
                                         swap_sz, names[2:4], "sdb")
2827
    disks = [drbd_sda_dev, drbd_sdb_dev]
2828
  elif template_name == constants.DT_FILE:
2829
    if len(secondary_nodes) != 0:
2830
      raise errors.ProgrammerError("Wrong template configuration")
2831

    
2832
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2833
                                iv_name="sda", logical_id=(file_driver,
2834
                                "%s/sda" % file_storage_dir))
2835
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2836
                                iv_name="sdb", logical_id=(file_driver,
2837
                                "%s/sdb" % file_storage_dir))
2838
    disks = [file_sda_dev, file_sdb_dev]
2839
  else:
2840
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2841
  return disks
2842

    
2843

    
2844
def _GetInstanceInfoText(instance):
2845
  """Compute that text that should be added to the disk's metadata.
2846

2847
  """
2848
  return "originstname+%s" % instance.name
2849

    
2850

    
2851
def _CreateDisks(cfg, instance):
2852
  """Create all disks for an instance.
2853

2854
  This abstracts away some work from AddInstance.
2855

2856
  Args:
2857
    instance: the instance object
2858

2859
  Returns:
2860
    True or False showing the success of the creation process
2861

2862
  """
2863
  info = _GetInstanceInfoText(instance)
2864

    
2865
  if instance.disk_template == constants.DT_FILE:
2866
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2867
    result = rpc.call_file_storage_dir_create(instance.primary_node,
2868
                                              file_storage_dir)
2869

    
2870
    if not result:
2871
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
2872
      return False
2873

    
2874
    if not result[0]:
2875
      logger.Error("failed to create directory '%s'" % file_storage_dir)
2876
      return False
2877

    
2878
  for device in instance.disks:
2879
    logger.Info("creating volume %s for instance %s" %
2880
                (device.iv_name, instance.name))
2881
    #HARDCODE
2882
    for secondary_node in instance.secondary_nodes:
2883
      if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2884
                                        device, False, info):
2885
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2886
                     (device.iv_name, device, secondary_node))
2887
        return False
2888
    #HARDCODE
2889
    if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2890
                                    instance, device, info):
2891
      logger.Error("failed to create volume %s on primary!" %
2892
                   device.iv_name)
2893
      return False
2894

    
2895
  return True
2896

    
2897

    
2898
def _RemoveDisks(instance, cfg):
2899
  """Remove all disks for an instance.
2900

2901
  This abstracts away some work from `AddInstance()` and
2902
  `RemoveInstance()`. Note that in case some of the devices couldn't
2903
  be removed, the removal will continue with the other ones (compare
2904
  with `_CreateDisks()`).
2905

2906
  Args:
2907
    instance: the instance object
2908

2909
  Returns:
2910
    True or False showing the success of the removal proces
2911

2912
  """
2913
  logger.Info("removing block devices for instance %s" % instance.name)
2914

    
2915
  result = True
2916
  for device in instance.disks:
2917
    for node, disk in device.ComputeNodeTree(instance.primary_node):
2918
      cfg.SetDiskID(disk, node)
2919
      if not rpc.call_blockdev_remove(node, disk):
2920
        logger.Error("could not remove block device %s on node %s,"
2921
                     " continuing anyway" %
2922
                     (device.iv_name, node))
2923
        result = False
2924

    
2925
  if instance.disk_template == constants.DT_FILE:
2926
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2927
    if not rpc.call_file_storage_dir_remove(instance.primary_node,
2928
                                            file_storage_dir):
2929
      logger.Error("could not remove directory '%s'" % file_storage_dir)
2930
      result = False
2931

    
2932
  return result
2933

    
2934

    
2935
def _ComputeDiskSize(disk_template, disk_size, swap_size):
2936
  """Compute disk size requirements in the volume group
2937

2938
  This is currently hard-coded for the two-drive layout.
2939

2940
  """
2941
  # Required free disk space as a function of disk and swap space
2942
  req_size_dict = {
2943
    constants.DT_DISKLESS: None,
2944
    constants.DT_PLAIN: disk_size + swap_size,
2945
    # 256 MB are added for drbd metadata, 128MB for each drbd device
2946
    constants.DT_DRBD8: disk_size + swap_size + 256,
2947
    constants.DT_FILE: None,
2948
  }
2949

    
2950
  if disk_template not in req_size_dict:
2951
    raise errors.ProgrammerError("Disk template '%s' size requirement"
2952
                                 " is unknown" %  disk_template)
2953

    
2954
  return req_size_dict[disk_template]
2955

    
2956

    
2957
class LUCreateInstance(LogicalUnit):
2958
  """Create an instance.
2959

2960
  """
2961
  HPATH = "instance-add"
2962
  HTYPE = constants.HTYPE_INSTANCE
2963
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
2964
              "disk_template", "swap_size", "mode", "start", "vcpus",
2965
              "wait_for_sync", "ip_check", "mac"]
2966

    
2967
  def _RunAllocator(self):
2968
    """Run the allocator based on input opcode.
2969

2970
    """
2971
    disks = [{"size": self.op.disk_size, "mode": "w"},
2972
             {"size": self.op.swap_size, "mode": "w"}]
2973
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2974
             "bridge": self.op.bridge}]
2975
    ial = IAllocator(self.cfg, self.sstore,
2976
                     mode=constants.IALLOCATOR_MODE_ALLOC,
2977
                     name=self.op.instance_name,
2978
                     disk_template=self.op.disk_template,
2979
                     tags=[],
2980
                     os=self.op.os_type,
2981
                     vcpus=self.op.vcpus,
2982
                     mem_size=self.op.mem_size,
2983
                     disks=disks,
2984
                     nics=nics,
2985
                     )
2986

    
2987
    ial.Run(self.op.iallocator)
2988

    
2989
    if not ial.success:
2990
      raise errors.OpPrereqError("Can't compute nodes using"
2991
                                 " iallocator '%s': %s" % (self.op.iallocator,
2992
                                                           ial.info))
2993
    if len(ial.nodes) != ial.required_nodes:
2994
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2995
                                 " of nodes (%s), required %s" %
2996
                                 (len(ial.nodes), ial.required_nodes))
2997
    self.op.pnode = ial.nodes[0]
2998
    logger.ToStdout("Selected nodes for the instance: %s" %
2999
                    (", ".join(ial.nodes),))
3000
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3001
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3002
    if ial.required_nodes == 2:
3003
      self.op.snode = ial.nodes[1]
3004

    
3005
  def BuildHooksEnv(self):
3006
    """Build hooks env.
3007

3008
    This runs on master, primary and secondary nodes of the instance.
3009

3010
    """
3011
    env = {
3012
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3013
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3014
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3015
      "INSTANCE_ADD_MODE": self.op.mode,
3016
      }
3017
    if self.op.mode == constants.INSTANCE_IMPORT:
3018
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3019
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3020
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3021

    
3022
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3023
      primary_node=self.op.pnode,
3024
      secondary_nodes=self.secondaries,
3025
      status=self.instance_status,
3026
      os_type=self.op.os_type,
3027
      memory=self.op.mem_size,
3028
      vcpus=self.op.vcpus,
3029
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3030
    ))
3031

    
3032
    nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3033
          self.secondaries)
3034
    return env, nl, nl
3035

    
3036

    
3037
  def CheckPrereq(self):
3038
    """Check prerequisites.
3039

3040
    """
3041
    # set optional parameters to none if they don't exist
3042
    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3043
                 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3044
                 "vnc_bind_address"]:
3045
      if not hasattr(self.op, attr):
3046
        setattr(self.op, attr, None)
3047

    
3048
    if self.op.mode not in (constants.INSTANCE_CREATE,
3049
                            constants.INSTANCE_IMPORT):
3050
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3051
                                 self.op.mode)
3052

    
3053
    if (not self.cfg.GetVGName() and
3054
        self.op.disk_template not in constants.DTS_NOT_LVM):
3055
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3056
                                 " instances")
3057

    
3058
    if self.op.mode == constants.INSTANCE_IMPORT:
3059
      src_node = getattr(self.op, "src_node", None)
3060
      src_path = getattr(self.op, "src_path", None)
3061
      if src_node is None or src_path is None:
3062
        raise errors.OpPrereqError("Importing an instance requires source"
3063
                                   " node and path options")
3064
      src_node_full = self.cfg.ExpandNodeName(src_node)
3065
      if src_node_full is None:
3066
        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3067
      self.op.src_node = src_node = src_node_full
3068

    
3069
      if not os.path.isabs(src_path):
3070
        raise errors.OpPrereqError("The source path must be absolute")
3071

    
3072
      export_info = rpc.call_export_info(src_node, src_path)
3073

    
3074
      if not export_info:
3075
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3076

    
3077
      if not export_info.has_section(constants.INISECT_EXP):
3078
        raise errors.ProgrammerError("Corrupted export config")
3079

    
3080
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3081
      if (int(ei_version) != constants.EXPORT_VERSION):
3082
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3083
                                   (ei_version, constants.EXPORT_VERSION))
3084

    
3085
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3086
        raise errors.OpPrereqError("Can't import instance with more than"
3087
                                   " one data disk")
3088

    
3089
      # FIXME: are the old os-es, disk sizes, etc. useful?
3090
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3091
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3092
                                                         'disk0_dump'))
3093
      self.src_image = diskimage
3094
    else: # INSTANCE_CREATE
3095
      if getattr(self.op, "os_type", None) is None:
3096
        raise errors.OpPrereqError("No guest OS specified")
3097

    
3098
    #### instance parameters check
3099

    
3100
    # disk template and mirror node verification
3101
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3102
      raise errors.OpPrereqError("Invalid disk template name")
3103

    
3104
    # instance name verification
3105
    hostname1 = utils.HostInfo(self.op.instance_name)
3106

    
3107
    self.op.instance_name = instance_name = hostname1.name
3108
    instance_list = self.cfg.GetInstanceList()
3109
    if instance_name in instance_list:
3110
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3111
                                 instance_name)
3112

    
3113
    # ip validity checks
3114
    ip = getattr(self.op, "ip", None)
3115
    if ip is None or ip.lower() == "none":
3116
      inst_ip = None
3117
    elif ip.lower() == "auto":
3118
      inst_ip = hostname1.ip
3119
    else:
3120
      if not utils.IsValidIP(ip):
3121
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3122
                                   " like a valid IP" % ip)
3123
      inst_ip = ip
3124
    self.inst_ip = self.op.ip = inst_ip
3125

    
3126
    if self.op.start and not self.op.ip_check:
3127
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3128
                                 " adding an instance in start mode")
3129

    
3130
    if self.op.ip_check:
3131
      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3132
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3133
                                   (hostname1.ip, instance_name))
3134

    
3135
    # MAC address verification
3136
    if self.op.mac != "auto":
3137
      if not utils.IsValidMac(self.op.mac.lower()):
3138
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3139
                                   self.op.mac)
3140

    
3141
    # bridge verification
3142
    bridge = getattr(self.op, "bridge", None)
3143
    if bridge is None:
3144
      self.op.bridge = self.cfg.GetDefBridge()
3145
    else:
3146
      self.op.bridge = bridge
3147

    
3148
    # boot order verification
3149
    if self.op.hvm_boot_order is not None:
3150
      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3151
        raise errors.OpPrereqError("invalid boot order specified,"
3152
                                   " must be one or more of [acdn]")
3153
    # file storage checks
3154
    if (self.op.file_driver and
3155
        not self.op.file_driver in constants.FILE_DRIVER):
3156
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3157
                                 self.op.file_driver)
3158

    
3159
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3160
      raise errors.OpPrereqError("File storage directory not a relative"
3161
                                 " path")
3162
    #### allocator run
3163

    
3164
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3165
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3166
                                 " node must be given")
3167

    
3168
    if self.op.iallocator is not None:
3169
      self._RunAllocator()
3170

    
3171
    #### node related checks
3172

    
3173
    # check primary node
3174
    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3175
    if pnode is None:
3176
      raise errors.OpPrereqError("Primary node '%s' is unknown" %
3177
                                 self.op.pnode)
3178
    self.op.pnode = pnode.name
3179
    self.pnode = pnode
3180
    self.secondaries = []
3181

    
3182
    # mirror node verification
3183
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3184
      if getattr(self.op, "snode", None) is None:
3185
        raise errors.OpPrereqError("The networked disk templates need"
3186
                                   " a mirror node")
3187

    
3188
      snode_name = self.cfg.ExpandNodeName(self.op.snode)
3189
      if snode_name is None:
3190
        raise errors.OpPrereqError("Unknown secondary node '%s'" %
3191
                                   self.op.snode)
3192
      elif snode_name == pnode.name:
3193
        raise errors.OpPrereqError("The secondary node cannot be"
3194
                                   " the primary node.")
3195
      self.secondaries.append(snode_name)
3196

    
3197
    req_size = _ComputeDiskSize(self.op.disk_template,
3198
                                self.op.disk_size, self.op.swap_size)
3199

    
3200
    # Check lv size requirements
3201
    if req_size is not None:
3202
      nodenames = [pnode.name] + self.secondaries
3203
      nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3204
      for node in nodenames:
3205
        info = nodeinfo.get(node, None)
3206
        if not info:
3207
          raise errors.OpPrereqError("Cannot get current information"
3208
                                     " from node '%s'" % node)
3209
        vg_free = info.get('vg_free', None)
3210
        if not isinstance(vg_free, int):
3211
          raise errors.OpPrereqError("Can't compute free disk space on"
3212
                                     " node %s" % node)
3213
        if req_size > info['vg_free']:
3214
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3215
                                     " %d MB available, %d MB required" %
3216
                                     (node, info['vg_free'], req_size))
3217

    
3218
    # os verification
3219
    os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3220
    if not os_obj:
3221
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3222
                                 " primary node"  % self.op.os_type)
3223

    
3224
    if self.op.kernel_path == constants.VALUE_NONE:
3225
      raise errors.OpPrereqError("Can't set instance kernel to none")
3226

    
3227

    
3228
    # bridge check on primary node
3229
    if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3230
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3231
                                 " destination node '%s'" %
3232
                                 (self.op.bridge, pnode.name))
3233

    
3234
    # memory check on primary node
3235
    if self.op.start:
3236
      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3237
                           "creating instance %s" % self.op.instance_name,
3238
                           self.op.mem_size)
3239

    
3240
    # hvm_cdrom_image_path verification
3241
    if self.op.hvm_cdrom_image_path is not None:
3242
      if not os.path.isabs(self.op.hvm_cdrom_image_path):
3243
        raise errors.OpPrereqError("The path to the HVM CDROM image must"
3244
                                   " be an absolute path or None, not %s" %
3245
                                   self.op.hvm_cdrom_image_path)
3246
      if not os.path.isfile(self.op.hvm_cdrom_image_path):
3247
        raise errors.OpPrereqError("The HVM CDROM image must either be a"
3248
                                   " regular file or a symlink pointing to"
3249
                                   " an existing regular file, not %s" %
3250
                                   self.op.hvm_cdrom_image_path)
3251

    
3252
    # vnc_bind_address verification
3253
    if self.op.vnc_bind_address is not None:
3254
      if not utils.IsValidIP(self.op.vnc_bind_address):
3255
        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3256
                                   " like a valid IP address" %
3257
                                   self.op.vnc_bind_address)
3258

    
3259
    if self.op.start:
3260
      self.instance_status = 'up'
3261
    else:
3262
      self.instance_status = 'down'
3263

    
3264
  def Exec(self, feedback_fn):
3265
    """Create and add the instance to the cluster.
3266

3267
    """
3268
    instance = self.op.instance_name
3269
    pnode_name = self.pnode.name
3270

    
3271
    if self.op.mac == "auto":
3272
      mac_address = self.cfg.GenerateMAC()
3273
    else:
3274
      mac_address = self.op.mac
3275

    
3276
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3277
    if self.inst_ip is not None:
3278
      nic.ip = self.inst_ip
3279

    
3280
    ht_kind = self.sstore.GetHypervisorType()
3281
    if ht_kind in constants.HTS_REQ_PORT:
3282
      network_port = self.cfg.AllocatePort()
3283
    else:
3284
      network_port = None
3285

    
3286
    if self.op.vnc_bind_address is None:
3287
      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3288

    
3289
    # this is needed because os.path.join does not accept None arguments
3290
    if self.op.file_storage_dir is None:
3291
      string_file_storage_dir = ""
3292
    else:
3293
      string_file_storage_dir = self.op.file_storage_dir
3294

    
3295
    # build the full file storage dir path
3296
    file_storage_dir = os.path.normpath(os.path.join(
3297
                                        self.sstore.GetFileStorageDir(),
3298
                                        string_file_storage_dir, instance))
3299

    
3300

    
3301
    disks = _GenerateDiskTemplate(self.cfg,
3302
                                  self.op.disk_template,
3303
                                  instance, pnode_name,
3304
                                  self.secondaries, self.op.disk_size,
3305
                                  self.op.swap_size,
3306
                                  file_storage_dir,
3307
                                  self.op.file_driver)
3308

    
3309
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3310
                            primary_node=pnode_name,
3311
                            memory=self.op.mem_size,
3312
                            vcpus=self.op.vcpus,
3313
                            nics=[nic], disks=disks,
3314
                            disk_template=self.op.disk_template,
3315
                            status=self.instance_status,
3316
                            network_port=network_port,
3317
                            kernel_path=self.op.kernel_path,
3318
                            initrd_path=self.op.initrd_path,
3319
                            hvm_boot_order=self.op.hvm_boot_order,
3320
                            hvm_acpi=self.op.hvm_acpi,
3321
                            hvm_pae=self.op.hvm_pae,
3322
                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3323
                            vnc_bind_address=self.op.vnc_bind_address,
3324
                            )
3325

    
3326
    feedback_fn("* creating instance disks...")
3327
    if not _CreateDisks(self.cfg, iobj):
3328
      _RemoveDisks(iobj, self.cfg)
3329
      raise errors.OpExecError("Device creation failed, reverting...")
3330

    
3331
    feedback_fn("adding instance %s to cluster config" % instance)
3332

    
3333
    self.cfg.AddInstance(iobj)
3334
    # Add the new instance to the Ganeti Lock Manager
3335
    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3336

    
3337
    if self.op.wait_for_sync:
3338
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3339
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3340
      # make sure the disks are not degraded (still sync-ing is ok)
3341
      time.sleep(15)
3342
      feedback_fn("* checking mirrors status")
3343
      disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3344
    else:
3345
      disk_abort = False
3346

    
3347
    if disk_abort:
3348
      _RemoveDisks(iobj, self.cfg)
3349
      self.cfg.RemoveInstance(iobj.name)
3350
      # Remove the new instance from the Ganeti Lock Manager
3351
      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3352
      raise errors.OpExecError("There are some degraded disks for"
3353
                               " this instance")
3354

    
3355
    feedback_fn("creating os for instance %s on node %s" %
3356
                (instance, pnode_name))
3357

    
3358
    if iobj.disk_template != constants.DT_DISKLESS:
3359
      if self.op.mode == constants.INSTANCE_CREATE:
3360
        feedback_fn("* running the instance OS create scripts...")
3361
        if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3362
          raise errors.OpExecError("could not add os for instance %s"
3363
                                   " on node %s" %
3364
                                   (instance, pnode_name))
3365

    
3366
      elif self.op.mode == constants.INSTANCE_IMPORT:
3367
        feedback_fn("* running the instance OS import scripts...")
3368
        src_node = self.op.src_node
3369
        src_image = self.src_image
3370
        if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3371
                                                src_node, src_image):
3372
          raise errors.OpExecError("Could not import os for instance"
3373
                                   " %s on node %s" %
3374
                                   (instance, pnode_name))
3375
      else:
3376
        # also checked in the prereq part
3377
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3378
                                     % self.op.mode)
3379

    
3380
    if self.op.start:
3381
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3382
      feedback_fn("* starting instance...")
3383
      if not rpc.call_instance_start(pnode_name, iobj, None):
3384
        raise errors.OpExecError("Could not start instance")
3385

    
3386

    
3387
class LUConnectConsole(NoHooksLU):
3388
  """Connect to an instance's console.
3389

3390
  This is somewhat special in that it returns the command line that
3391
  you need to run on the master node in order to connect to the
3392
  console.
3393

3394
  """
3395
  _OP_REQP = ["instance_name"]
3396
  REQ_BGL = False
3397

    
3398
  def ExpandNames(self):
3399
    self._ExpandAndLockInstance()
3400

    
3401
  def CheckPrereq(self):
3402
    """Check prerequisites.
3403

3404
    This checks that the instance is in the cluster.
3405

3406
    """
3407
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3408
    assert self.instance is not None, \
3409
      "Cannot retrieve locked instance %s" % self.op.instance_name
3410

    
3411
  def Exec(self, feedback_fn):
3412
    """Connect to the console of an instance
3413

3414
    """
3415
    instance = self.instance
3416
    node = instance.primary_node
3417

    
3418
    node_insts = rpc.call_instance_list([node])[node]
3419
    if node_insts is False:
3420
      raise errors.OpExecError("Can't connect to node %s." % node)
3421

    
3422
    if instance.name not in node_insts:
3423
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3424

    
3425
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3426

    
3427
    hyper = hypervisor.GetHypervisor()
3428
    console_cmd = hyper.GetShellCommandForConsole(instance)
3429

    
3430
    # build ssh cmdline
3431
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3432

    
3433

    
3434
class LUReplaceDisks(LogicalUnit):
3435
  """Replace the disks of an instance.
3436

3437
  """
3438
  HPATH = "mirrors-replace"
3439
  HTYPE = constants.HTYPE_INSTANCE
3440
  _OP_REQP = ["instance_name", "mode", "disks"]
3441

    
3442
  def _RunAllocator(self):
3443
    """Compute a new secondary node using an IAllocator.
3444

3445
    """
3446
    ial = IAllocator(self.cfg, self.sstore,
3447
                     mode=constants.IALLOCATOR_MODE_RELOC,
3448
                     name=self.op.instance_name,
3449
                     relocate_from=[self.sec_node])
3450

    
3451
    ial.Run(self.op.iallocator)
3452

    
3453
    if not ial.success:
3454
      raise errors.OpPrereqError("Can't compute nodes using"
3455
                                 " iallocator '%s': %s" % (self.op.iallocator,
3456
                                                           ial.info))
3457
    if len(ial.nodes) != ial.required_nodes:
3458
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3459
                                 " of nodes (%s), required %s" %
3460
                                 (len(ial.nodes), ial.required_nodes))
3461
    self.op.remote_node = ial.nodes[0]
3462
    logger.ToStdout("Selected new secondary for the instance: %s" %
3463
                    self.op.remote_node)
3464

    
3465
  def BuildHooksEnv(self):
3466
    """Build hooks env.
3467

3468
    This runs on the master, the primary and all the secondaries.
3469

3470
    """
3471
    env = {
3472
      "MODE": self.op.mode,
3473
      "NEW_SECONDARY": self.op.remote_node,
3474
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3475
      }
3476
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3477
    nl = [
3478
      self.sstore.GetMasterNode(),
3479
      self.instance.primary_node,
3480
      ]
3481
    if self.op.remote_node is not None:
3482
      nl.append(self.op.remote_node)
3483
    return env, nl, nl
3484

    
3485
  def CheckPrereq(self):
3486
    """Check prerequisites.
3487

3488
    This checks that the instance is in the cluster.
3489

3490
    """
3491
    if not hasattr(self.op, "remote_node"):
3492
      self.op.remote_node = None
3493

    
3494
    instance = self.cfg.GetInstanceInfo(
3495
      self.cfg.ExpandInstanceName(self.op.instance_name))
3496
    if instance is None:
3497
      raise errors.OpPrereqError("Instance '%s' not known" %
3498
                                 self.op.instance_name)
3499
    self.instance = instance
3500
    self.op.instance_name = instance.name
3501

    
3502
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3503
      raise errors.OpPrereqError("Instance's disk layout is not"
3504
                                 " network mirrored.")
3505

    
3506
    if len(instance.secondary_nodes) != 1:
3507
      raise errors.OpPrereqError("The instance has a strange layout,"
3508
                                 " expected one secondary but found %d" %
3509
                                 len(instance.secondary_nodes))
3510

    
3511
    self.sec_node = instance.secondary_nodes[0]
3512

    
3513
    ia_name = getattr(self.op, "iallocator", None)
3514
    if ia_name is not None:
3515
      if self.op.remote_node is not None:
3516
        raise errors.OpPrereqError("Give either the iallocator or the new"
3517
                                   " secondary, not both")
3518
      self.op.remote_node = self._RunAllocator()
3519

    
3520
    remote_node = self.op.remote_node
3521
    if remote_node is not None:
3522
      remote_node = self.cfg.ExpandNodeName(remote_node)
3523
      if remote_node is None:
3524
        raise errors.OpPrereqError("Node '%s' not known" %
3525
                                   self.op.remote_node)
3526
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3527
    else:
3528
      self.remote_node_info = None
3529
    if remote_node == instance.primary_node:
3530
      raise errors.OpPrereqError("The specified node is the primary node of"
3531
                                 " the instance.")
3532
    elif remote_node == self.sec_node:
3533
      if self.op.mode == constants.REPLACE_DISK_SEC:
3534
        # this is for DRBD8, where we can't execute the same mode of
3535
        # replacement as for drbd7 (no different port allocated)
3536
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3537
                                   " replacement")
3538
    if instance.disk_template == constants.DT_DRBD8:
3539
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3540
          remote_node is not None):
3541
        # switch to replace secondary mode
3542
        self.op.mode = constants.REPLACE_DISK_SEC
3543

    
3544
      if self.op.mode == constants.REPLACE_DISK_ALL:
3545
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3546
                                   " secondary disk replacement, not"
3547
                                   " both at once")
3548
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3549
        if remote_node is not None:
3550
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3551
                                     " the secondary while doing a primary"
3552
                                     " node disk replacement")
3553
        self.tgt_node = instance.primary_node
3554
        self.oth_node = instance.secondary_nodes[0]
3555
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3556
        self.new_node = remote_node # this can be None, in which case
3557
                                    # we don't change the secondary
3558
        self.tgt_node = instance.secondary_nodes[0]
3559
        self.oth_node = instance.primary_node
3560
      else:
3561
        raise errors.ProgrammerError("Unhandled disk replace mode")
3562

    
3563
    for name in self.op.disks:
3564
      if instance.FindDisk(name) is None:
3565
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3566
                                   (name, instance.name))
3567
    self.op.remote_node = remote_node
3568

    
3569
  def _ExecD8DiskOnly(self, feedback_fn):
3570
    """Replace a disk on the primary or secondary for dbrd8.
3571

3572
    The algorithm for replace is quite complicated:
3573
      - for each disk to be replaced:
3574
        - create new LVs on the target node with unique names
3575
        - detach old LVs from the drbd device
3576
        - rename old LVs to name_replaced.<time_t>
3577
        - rename new LVs to old LVs
3578
        - attach the new LVs (with the old names now) to the drbd device
3579
      - wait for sync across all devices
3580
      - for each modified disk:
3581
        - remove old LVs (which have the name name_replaces.<time_t>)
3582

3583
    Failures are not very well handled.
3584

3585
    """
3586
    steps_total = 6
3587
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3588
    instance = self.instance
3589
    iv_names = {}
3590
    vgname = self.cfg.GetVGName()
3591
    # start of work
3592
    cfg = self.cfg
3593
    tgt_node = self.tgt_node
3594
    oth_node = self.oth_node
3595

    
3596
    # Step: check device activation
3597
    self.proc.LogStep(1, steps_total, "check device existence")
3598
    info("checking volume groups")
3599
    my_vg = cfg.GetVGName()
3600
    results = rpc.call_vg_list([oth_node, tgt_node])
3601
    if not results:
3602
      raise errors.OpExecError("Can't list volume groups on the nodes")
3603
    for node in oth_node, tgt_node:
3604
      res = results.get(node, False)
3605
      if not res or my_vg not in res:
3606
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3607
                                 (my_vg, node))
3608
    for dev in instance.disks:
3609
      if not dev.iv_name in self.op.disks:
3610
        continue
3611
      for node in tgt_node, oth_node:
3612
        info("checking %s on %s" % (dev.iv_name, node))
3613
        cfg.SetDiskID(dev, node)
3614
        if not rpc.call_blockdev_find(node, dev):
3615
          raise errors.OpExecError("Can't find device %s on node %s" %
3616
                                   (dev.iv_name, node))
3617

    
3618
    # Step: check other node consistency
3619
    self.proc.LogStep(2, steps_total, "check peer consistency")
3620
    for dev in instance.disks:
3621
      if not dev.iv_name in self.op.disks:
3622
        continue
3623
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3624
      if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3625
                                   oth_node==instance.primary_node):
3626
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3627
                                 " to replace disks on this node (%s)" %
3628
                                 (oth_node, tgt_node))
3629

    
3630
    # Step: create new storage
3631
    self.proc.LogStep(3, steps_total, "allocate new storage")
3632
    for dev in instance.disks:
3633
      if not dev.iv_name in self.op.disks:
3634
        continue
3635
      size = dev.size
3636
      cfg.SetDiskID(dev, tgt_node)
3637
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3638
      names = _GenerateUniqueNames(cfg, lv_names)
3639
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3640
                             logical_id=(vgname, names[0]))
3641
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3642
                             logical_id=(vgname, names[1]))
3643
      new_lvs = [lv_data, lv_meta]
3644
      old_lvs = dev.children
3645
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3646
      info("creating new local storage on %s for %s" %
3647
           (tgt_node, dev.iv_name))
3648
      # since we *always* want to create this LV, we use the
3649
      # _Create...OnPrimary (which forces the creation), even if we
3650
      # are talking about the secondary node
3651
      for new_lv in new_lvs:
3652
        if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3653
                                        _GetInstanceInfoText(instance)):
3654
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3655
                                   " node '%s'" %
3656
                                   (new_lv.logical_id[1], tgt_node))
3657

    
3658
    # Step: for each lv, detach+rename*2+attach
3659
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3660
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3661
      info("detaching %s drbd from local storage" % dev.iv_name)
3662
      if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3663
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3664
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3665
      #dev.children = []
3666
      #cfg.Update(instance)
3667

    
3668
      # ok, we created the new LVs, so now we know we have the needed
3669
      # storage; as such, we proceed on the target node to rename
3670
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3671
      # using the assumption that logical_id == physical_id (which in
3672
      # turn is the unique_id on that node)
3673

    
3674
      # FIXME(iustin): use a better name for the replaced LVs
3675
      temp_suffix = int(time.time())
3676
      ren_fn = lambda d, suff: (d.physical_id[0],
3677
                                d.physical_id[1] + "_replaced-%s" % suff)
3678
      # build the rename list based on what LVs exist on the node
3679
      rlist = []
3680
      for to_ren in old_lvs:
3681
        find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3682
        if find_res is not None: # device exists
3683
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3684

    
3685
      info("renaming the old LVs on the target node")
3686
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3687
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3688
      # now we rename the new LVs to the old LVs
3689
      info("renaming the new LVs on the target node")
3690
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3691
      if not rpc.call_blockdev_rename(tgt_node, rlist):
3692
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3693

    
3694
      for old, new in zip(old_lvs, new_lvs):
3695
        new.logical_id = old.logical_id
3696
        cfg.SetDiskID(new, tgt_node)
3697

    
3698
      for disk in old_lvs:
3699
        disk.logical_id = ren_fn(disk, temp_suffix)
3700
        cfg.SetDiskID(disk, tgt_node)
3701

    
3702
      # now that the new lvs have the old name, we can add them to the device
3703
      info("adding new mirror component on %s" % tgt_node)
3704
      if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3705
        for new_lv in new_lvs:
3706
          if not rpc.call_blockdev_remove(tgt_node, new_lv):
3707
            warning("Can't rollback device %s", hint="manually cleanup unused"
3708
                    " logical volumes")
3709
        raise errors.OpExecError("Can't add local storage to drbd")
3710

    
3711
      dev.children = new_lvs
3712
      cfg.Update(instance)
3713

    
3714
    # Step: wait for sync
3715

    
3716
    # this can fail as the old devices are degraded and _WaitForSync
3717
    # does a combined result over all disks, so we don't check its
3718
    # return value
3719
    self.proc.LogStep(5, steps_total, "sync devices")
3720
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3721

    
3722
    # so check manually all the devices
3723
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3724
      cfg.SetDiskID(dev, instance.primary_node)
3725
      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3726
      if is_degr:
3727
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3728

    
3729
    # Step: remove old storage
3730
    self.proc.LogStep(6, steps_total, "removing old storage")
3731
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3732
      info("remove logical volumes for %s" % name)
3733
      for lv in old_lvs:
3734
        cfg.SetDiskID(lv, tgt_node)
3735
        if not rpc.call_blockdev_remove(tgt_node, lv):
3736
          warning("Can't remove old LV", hint="manually remove unused LVs")
3737
          continue
3738

    
3739
  def _ExecD8Secondary(self, feedback_fn):
3740
    """Replace the secondary node for drbd8.
3741

3742
    The algorithm for replace is quite complicated:
3743
      - for all disks of the instance:
3744
        - create new LVs on the new node with same names
3745
        - shutdown the drbd device on the old secondary
3746
        - disconnect the drbd network on the primary
3747
        - create the drbd device on the new secondary
3748
        - network attach the drbd on the primary, using an artifice:
3749
          the drbd code for Attach() will connect to the network if it
3750
          finds a device which is connected to the good local disks but
3751
          not network enabled
3752
      - wait for sync across all devices
3753
      - remove all disks from the old secondary
3754

3755
    Failures are not very well handled.
3756

3757
    """
3758
    steps_total = 6
3759
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3760
    instance = self.instance
3761
    iv_names = {}
3762
    vgname = self.cfg.GetVGName()
3763
    # start of work
3764
    cfg = self.cfg
3765
    old_node = self.tgt_node
3766
    new_node = self.new_node
3767
    pri_node = instance.primary_node
3768

    
3769
    # Step: check device activation
3770
    self.proc.LogStep(1, steps_total, "check device existence")
3771
    info("checking volume groups")
3772
    my_vg = cfg.GetVGName()
3773
    results = rpc.call_vg_list([pri_node, new_node])
3774
    if not results:
3775
      raise errors.OpExecError("Can't list volume groups on the nodes")
3776
    for node in pri_node, new_node:
3777
      res = results.get(node, False)
3778
      if not res or my_vg not in res:
3779
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3780
                                 (my_vg, node))
3781
    for dev in instance.disks:
3782
      if not dev.iv_name in self.op.disks:
3783
        continue
3784
      info("checking %s on %s" % (dev.iv_name, pri_node))
3785
      cfg.SetDiskID(dev, pri_node)
3786
      if not rpc.call_blockdev_find(pri_node, dev):
3787
        raise errors.OpExecError("Can't find device %s on node %s" %
3788
                                 (dev.iv_name, pri_node))
3789

    
3790
    # Step: check other node consistency
3791
    self.proc.LogStep(2, steps_total, "check peer consistency")
3792
    for dev in instance.disks:
3793
      if not dev.iv_name in self.op.disks:
3794
        continue
3795
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3796
      if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3797
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
3798
                                 " unsafe to replace the secondary" %
3799
                                 pri_node)
3800

    
3801
    # Step: create new storage
3802
    self.proc.LogStep(3, steps_total, "allocate new storage")
3803
    for dev in instance.disks:
3804
      size = dev.size
3805
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3806
      # since we *always* want to create this LV, we use the
3807
      # _Create...OnPrimary (which forces the creation), even if we
3808
      # are talking about the secondary node
3809
      for new_lv in dev.children:
3810
        if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3811
                                        _GetInstanceInfoText(instance)):
3812
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3813
                                   " node '%s'" %
3814
                                   (new_lv.logical_id[1], new_node))
3815

    
3816
      iv_names[dev.iv_name] = (dev, dev.children)
3817

    
3818
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
3819
    for dev in instance.disks:
3820
      size = dev.size
3821
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3822
      # create new devices on new_node
3823
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3824
                              logical_id=(pri_node, new_node,
3825
                                          dev.logical_id[2]),
3826
                              children=dev.children)
3827
      if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3828
                                        new_drbd, False,
3829
                                      _GetInstanceInfoText(instance)):
3830
        raise errors.OpExecError("Failed to create new DRBD on"
3831
                                 " node '%s'" % new_node)
3832

    
3833
    for dev in instance.disks:
3834
      # we have new devices, shutdown the drbd on the old secondary
3835
      info("shutting down drbd for %s on old node" % dev.iv_name)
3836
      cfg.SetDiskID(dev, old_node)
3837
      if not rpc.call_blockdev_shutdown(old_node, dev):
3838
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3839
                hint="Please cleanup this device manually as soon as possible")
3840

    
3841
    info("detaching primary drbds from the network (=> standalone)")
3842
    done = 0
3843
    for dev in instance.disks:
3844
      cfg.SetDiskID(dev, pri_node)
3845
      # set the physical (unique in bdev terms) id to None, meaning
3846
      # detach from network
3847
      dev.physical_id = (None,) * len(dev.physical_id)
3848
      # and 'find' the device, which will 'fix' it to match the
3849
      # standalone state
3850
      if rpc.call_blockdev_find(pri_node, dev):
3851
        done += 1
3852
      else:
3853
        warning("Failed to detach drbd %s from network, unusual case" %
3854
                dev.iv_name)
3855

    
3856
    if not done:
3857
      # no detaches succeeded (very unlikely)
3858
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
3859

    
3860
    # if we managed to detach at least one, we update all the disks of
3861
    # the instance to point to the new secondary
3862
    info("updating instance configuration")
3863
    for dev in instance.disks:
3864
      dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3865
      cfg.SetDiskID(dev, pri_node)
3866
    cfg.Update(instance)
3867

    
3868
    # and now perform the drbd attach
3869
    info("attaching primary drbds to new secondary (standalone => connected)")
3870
    failures = []
3871
    for dev in instance.disks:
3872
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3873
      # since the attach is smart, it's enough to 'find' the device,
3874
      # it will automatically activate the network, if the physical_id
3875
      # is correct
3876
      cfg.SetDiskID(dev, pri_node)
3877
      if not rpc.call_blockdev_find(pri_node, dev):
3878
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3879
                "please do a gnt-instance info to see the status of disks")
3880

    
3881
    # this can fail as the old devices are degraded and _WaitForSync
3882
    # does a combined result over all disks, so we don't check its
3883
    # return value
3884
    self.proc.LogStep(5, steps_total, "sync devices")
3885
    _WaitForSync(cfg, instance, self.proc, unlock=True)
3886

    
3887
    # so check manually all the devices
3888
    for name, (dev, old_lvs) in iv_names.iteritems():
3889
      cfg.SetDiskID(dev, pri_node)
3890
      is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3891
      if is_degr:
3892
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3893

    
3894
    self.proc.LogStep(6, steps_total, "removing old storage")
3895
    for name, (dev, old_lvs) in iv_names.iteritems():
3896
      info("remove logical volumes for %s" % name)
3897
      for lv in old_lvs:
3898
        cfg.SetDiskID(lv, old_node)
3899
        if not rpc.call_blockdev_remove(old_node, lv):
3900
          warning("Can't remove LV on old secondary",
3901
                  hint="Cleanup stale volumes by hand")
3902

    
3903
  def Exec(self, feedback_fn):
3904
    """Execute disk replacement.
3905

3906
    This dispatches the disk replacement to the appropriate handler.
3907

3908
    """
3909
    instance = self.instance
3910

    
3911
    # Activate the instance disks if we're replacing them on a down instance
3912
    if instance.status == "down":
3913
      op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3914
      self.proc.ChainOpCode(op)
3915

    
3916
    if instance.disk_template == constants.DT_DRBD8:
3917
      if self.op.remote_node is None:
3918
        fn = self._ExecD8DiskOnly
3919
      else:
3920
        fn = self._ExecD8Secondary
3921
    else:
3922
      raise errors.ProgrammerError("Unhandled disk replacement case")
3923

    
3924
    ret = fn(feedback_fn)
3925

    
3926
    # Deactivate the instance disks if we're replacing them on a down instance
3927
    if instance.status == "down":
3928
      op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3929
      self.proc.ChainOpCode(op)
3930

    
3931
    return ret
3932

    
3933

    
3934
class LUGrowDisk(LogicalUnit):
3935
  """Grow a disk of an instance.
3936

3937
  """
3938
  HPATH = "disk-grow"
3939
  HTYPE = constants.HTYPE_INSTANCE
3940
  _OP_REQP = ["instance_name", "disk", "amount"]
3941

    
3942
  def BuildHooksEnv(self):
3943
    """Build hooks env.
3944

3945
    This runs on the master, the primary and all the secondaries.
3946

3947
    """
3948
    env = {
3949
      "DISK": self.op.disk,
3950
      "AMOUNT": self.op.amount,
3951
      }
3952
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3953
    nl = [
3954
      self.sstore.GetMasterNode(),
3955
      self.instance.primary_node,
3956
      ]
3957
    return env, nl, nl
3958

    
3959
  def CheckPrereq(self):
3960
    """Check prerequisites.
3961

3962
    This checks that the instance is in the cluster.
3963

3964
    """
3965
    instance = self.cfg.GetInstanceInfo(
3966
      self.cfg.ExpandInstanceName(self.op.instance_name))
3967
    if instance is None:
3968
      raise errors.OpPrereqError("Instance '%s' not known" %
3969
                                 self.op.instance_name)
3970
    self.instance = instance
3971
    self.op.instance_name = instance.name
3972

    
3973
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3974
      raise errors.OpPrereqError("Instance's disk layout does not support"
3975
                                 " growing.")
3976

    
3977
    if instance.FindDisk(self.op.disk) is None:
3978
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3979
                                 (self.op.disk, instance.name))