Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 74409b12

History | View | Annotate | Download (186 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35

    
36
from ganeti import ssh
37
from ganeti import logger
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_MASTER: the LU needs to run on the master node
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_MASTER = True
68
  REQ_BGL = True
69

    
70
  def __init__(self, processor, op, context, rpc):
71
    """Constructor for LogicalUnit.
72

73
    This needs to be overriden in derived classes in order to check op
74
    validity.
75

76
    """
77
    self.proc = processor
78
    self.op = op
79
    self.cfg = context.cfg
80
    self.context = context
81
    self.rpc = rpc
82
    # Dicts used to declare locking needs to mcpu
83
    self.needed_locks = None
84
    self.acquired_locks = {}
85
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86
    self.add_locks = {}
87
    self.remove_locks = {}
88
    # Used to force good behavior when calling helper functions
89
    self.recalculate_locks = {}
90
    self.__ssh = None
91

    
92
    for attr_name in self._OP_REQP:
93
      attr_val = getattr(op, attr_name, None)
94
      if attr_val is None:
95
        raise errors.OpPrereqError("Required parameter '%s' missing" %
96
                                   attr_name)
97

    
98
    if not self.cfg.IsCluster():
99
      raise errors.OpPrereqError("Cluster not initialized yet,"
100
                                 " use 'gnt-cluster init' first.")
101
    if self.REQ_MASTER:
102
      master = self.cfg.GetMasterNode()
103
      if master != utils.HostInfo().name:
104
        raise errors.OpPrereqError("Commands must be run on the master"
105
                                   " node %s" % master)
106

    
107
  def __GetSSH(self):
108
    """Returns the SshRunner object
109

110
    """
111
    if not self.__ssh:
112
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113
    return self.__ssh
114

    
115
  ssh = property(fget=__GetSSH)
116

    
117
  def ExpandNames(self):
118
    """Expand names for this LU.
119

120
    This method is called before starting to execute the opcode, and it should
121
    update all the parameters of the opcode to their canonical form (e.g. a
122
    short node name must be fully expanded after this method has successfully
123
    completed). This way locking, hooks, logging, ecc. can work correctly.
124

125
    LUs which implement this method must also populate the self.needed_locks
126
    member, as a dict with lock levels as keys, and a list of needed lock names
127
    as values. Rules:
128
      - Use an empty dict if you don't need any lock
129
      - If you don't need any lock at a particular level omit that level
130
      - Don't put anything for the BGL level
131
      - If you want all locks at a level use locking.ALL_SET as a value
132

133
    If you need to share locks (rather than acquire them exclusively) at one
134
    level you can modify self.share_locks, setting a true value (usually 1) for
135
    that level. By default locks are not shared.
136

137
    Examples:
138
    # Acquire all nodes and one instance
139
    self.needed_locks = {
140
      locking.LEVEL_NODE: locking.ALL_SET,
141
      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142
    }
143
    # Acquire just two nodes
144
    self.needed_locks = {
145
      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146
    }
147
    # Acquire no locks
148
    self.needed_locks = {} # No, you can't leave it to the default value None
149

150
    """
151
    # The implementation of this method is mandatory only if the new LU is
152
    # concurrent, so that old LUs don't need to be changed all at the same
153
    # time.
154
    if self.REQ_BGL:
155
      self.needed_locks = {} # Exclusive LUs don't need locks.
156
    else:
157
      raise NotImplementedError
158

    
159
  def DeclareLocks(self, level):
160
    """Declare LU locking needs for a level
161

162
    While most LUs can just declare their locking needs at ExpandNames time,
163
    sometimes there's the need to calculate some locks after having acquired
164
    the ones before. This function is called just before acquiring locks at a
165
    particular level, but after acquiring the ones at lower levels, and permits
166
    such calculations. It can be used to modify self.needed_locks, and by
167
    default it does nothing.
168

169
    This function is only called if you have something already set in
170
    self.needed_locks for the level.
171

172
    @param level: Locking level which is going to be locked
173
    @type level: member of ganeti.locking.LEVELS
174

175
    """
176

    
177
  def CheckPrereq(self):
178
    """Check prerequisites for this LU.
179

180
    This method should check that the prerequisites for the execution
181
    of this LU are fulfilled. It can do internode communication, but
182
    it should be idempotent - no cluster or system changes are
183
    allowed.
184

185
    The method should raise errors.OpPrereqError in case something is
186
    not fulfilled. Its return value is ignored.
187

188
    This method should also update all the parameters of the opcode to
189
    their canonical form if it hasn't been done by ExpandNames before.
190

191
    """
192
    raise NotImplementedError
193

    
194
  def Exec(self, feedback_fn):
195
    """Execute the LU.
196

197
    This method should implement the actual work. It should raise
198
    errors.OpExecError for failures that are somewhat dealt with in
199
    code, or expected.
200

201
    """
202
    raise NotImplementedError
203

    
204
  def BuildHooksEnv(self):
205
    """Build hooks environment for this LU.
206

207
    This method should return a three-node tuple consisting of: a dict
208
    containing the environment that will be used for running the
209
    specific hook for this LU, a list of node names on which the hook
210
    should run before the execution, and a list of node names on which
211
    the hook should run after the execution.
212

213
    The keys of the dict must not have 'GANETI_' prefixed as this will
214
    be handled in the hooks runner. Also note additional keys will be
215
    added by the hooks runner. If the LU doesn't define any
216
    environment, an empty dict (and not None) should be returned.
217

218
    No nodes should be returned as an empty list (and not None).
219

220
    Note that if the HPATH for a LU class is None, this function will
221
    not be called.
222

223
    """
224
    raise NotImplementedError
225

    
226
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227
    """Notify the LU about the results of its hooks.
228

229
    This method is called every time a hooks phase is executed, and notifies
230
    the Logical Unit about the hooks' result. The LU can then use it to alter
231
    its result based on the hooks.  By default the method does nothing and the
232
    previous result is passed back unchanged but any LU can define it if it
233
    wants to use the local cluster hook-scripts somehow.
234

235
    Args:
236
      phase: the hooks phase that has just been run
237
      hooks_results: the results of the multi-node hooks rpc call
238
      feedback_fn: function to send feedback back to the caller
239
      lu_result: the previous result this LU had, or None in the PRE phase.
240

241
    """
242
    return lu_result
243

    
244
  def _ExpandAndLockInstance(self):
245
    """Helper function to expand and lock an instance.
246

247
    Many LUs that work on an instance take its name in self.op.instance_name
248
    and need to expand it and then declare the expanded name for locking. This
249
    function does it, and then updates self.op.instance_name to the expanded
250
    name. It also initializes needed_locks as a dict, if this hasn't been done
251
    before.
252

253
    """
254
    if self.needed_locks is None:
255
      self.needed_locks = {}
256
    else:
257
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258
        "_ExpandAndLockInstance called with instance-level locks set"
259
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260
    if expanded_name is None:
261
      raise errors.OpPrereqError("Instance '%s' not known" %
262
                                  self.op.instance_name)
263
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264
    self.op.instance_name = expanded_name
265

    
266
  def _LockInstancesNodes(self, primary_only=False):
267
    """Helper function to declare instances' nodes for locking.
268

269
    This function should be called after locking one or more instances to lock
270
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271
    with all primary or secondary nodes for instances already locked and
272
    present in self.needed_locks[locking.LEVEL_INSTANCE].
273

274
    It should be called from DeclareLocks, and for safety only works if
275
    self.recalculate_locks[locking.LEVEL_NODE] is set.
276

277
    In the future it may grow parameters to just lock some instance's nodes, or
278
    to just lock primaries or secondary nodes, if needed.
279

280
    If should be called in DeclareLocks in a way similar to:
281

282
    if level == locking.LEVEL_NODE:
283
      self._LockInstancesNodes()
284

285
    @type primary_only: boolean
286
    @param primary_only: only lock primary nodes of locked instances
287

288
    """
289
    assert locking.LEVEL_NODE in self.recalculate_locks, \
290
      "_LockInstancesNodes helper function called with no nodes to recalculate"
291

    
292
    # TODO: check if we're really been called with the instance locks held
293

    
294
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295
    # future we might want to have different behaviors depending on the value
296
    # of self.recalculate_locks[locking.LEVEL_NODE]
297
    wanted_nodes = []
298
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299
      instance = self.context.cfg.GetInstanceInfo(instance_name)
300
      wanted_nodes.append(instance.primary_node)
301
      if not primary_only:
302
        wanted_nodes.extend(instance.secondary_nodes)
303

    
304
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308

    
309
    del self.recalculate_locks[locking.LEVEL_NODE]
310

    
311

    
312
class NoHooksLU(LogicalUnit):
313
  """Simple LU which runs no hooks.
314

315
  This LU is intended as a parent for other LogicalUnits which will
316
  run no hooks, in order to reduce duplicate code.
317

318
  """
319
  HPATH = None
320
  HTYPE = None
321

    
322

    
323
def _GetWantedNodes(lu, nodes):
324
  """Returns list of checked and expanded node names.
325

326
  Args:
327
    nodes: List of nodes (strings) or None for all
328

329
  """
330
  if not isinstance(nodes, list):
331
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
332

    
333
  if not nodes:
334
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335
      " non-empty list of nodes whose name is to be expanded.")
336

    
337
  wanted = []
338
  for name in nodes:
339
    node = lu.cfg.ExpandNodeName(name)
340
    if node is None:
341
      raise errors.OpPrereqError("No such node name '%s'" % name)
342
    wanted.append(node)
343

    
344
  return utils.NiceSort(wanted)
345

    
346

    
347
def _GetWantedInstances(lu, instances):
348
  """Returns list of checked and expanded instance names.
349

350
  Args:
351
    instances: List of instances (strings) or None for all
352

353
  """
354
  if not isinstance(instances, list):
355
    raise errors.OpPrereqError("Invalid argument type 'instances'")
356

    
357
  if instances:
358
    wanted = []
359

    
360
    for name in instances:
361
      instance = lu.cfg.ExpandInstanceName(name)
362
      if instance is None:
363
        raise errors.OpPrereqError("No such instance name '%s'" % name)
364
      wanted.append(instance)
365

    
366
  else:
367
    wanted = lu.cfg.GetInstanceList()
368
  return utils.NiceSort(wanted)
369

    
370

    
371
def _CheckOutputFields(static, dynamic, selected):
372
  """Checks whether all selected fields are valid.
373

374
  Args:
375
    static: Static fields
376
    dynamic: Dynamic fields
377

378
  """
379
  static_fields = frozenset(static)
380
  dynamic_fields = frozenset(dynamic)
381

    
382
  all_fields = static_fields | dynamic_fields
383

    
384
  if not all_fields.issuperset(selected):
385
    raise errors.OpPrereqError("Unknown output fields selected: %s"
386
                               % ",".join(frozenset(selected).
387
                                          difference(all_fields)))
388

    
389

    
390
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391
                          memory, vcpus, nics):
392
  """Builds instance related env variables for hooks from single variables.
393

394
  Args:
395
    secondary_nodes: List of secondary nodes as strings
396
  """
397
  env = {
398
    "OP_TARGET": name,
399
    "INSTANCE_NAME": name,
400
    "INSTANCE_PRIMARY": primary_node,
401
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402
    "INSTANCE_OS_TYPE": os_type,
403
    "INSTANCE_STATUS": status,
404
    "INSTANCE_MEMORY": memory,
405
    "INSTANCE_VCPUS": vcpus,
406
  }
407

    
408
  if nics:
409
    nic_count = len(nics)
410
    for idx, (ip, bridge, mac) in enumerate(nics):
411
      if ip is None:
412
        ip = ""
413
      env["INSTANCE_NIC%d_IP" % idx] = ip
414
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416
  else:
417
    nic_count = 0
418

    
419
  env["INSTANCE_NIC_COUNT"] = nic_count
420

    
421
  return env
422

    
423

    
424
def _BuildInstanceHookEnvByObject(instance, override=None):
425
  """Builds instance related env variables for hooks from an object.
426

427
  Args:
428
    instance: objects.Instance object of instance
429
    override: dict of values to override
430
  """
431
  args = {
432
    'name': instance.name,
433
    'primary_node': instance.primary_node,
434
    'secondary_nodes': instance.secondary_nodes,
435
    'os_type': instance.os,
436
    'status': instance.os,
437
    'memory': instance.memory,
438
    'vcpus': instance.vcpus,
439
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
440
  }
441
  if override:
442
    args.update(override)
443
  return _BuildInstanceHookEnv(**args)
444

    
445

    
446
def _CheckInstanceBridgesExist(lu, instance):
447
  """Check that the brigdes needed by an instance exist.
448

449
  """
450
  # check bridges existance
451
  brlist = [nic.bridge for nic in instance.nics]
452
  if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
453
    raise errors.OpPrereqError("one or more target bridges %s does not"
454
                               " exist on destination node '%s'" %
455
                               (brlist, instance.primary_node))
456

    
457

    
458
class LUDestroyCluster(NoHooksLU):
459
  """Logical unit for destroying the cluster.
460

461
  """
462
  _OP_REQP = []
463

    
464
  def CheckPrereq(self):
465
    """Check prerequisites.
466

467
    This checks whether the cluster is empty.
468

469
    Any errors are signalled by raising errors.OpPrereqError.
470

471
    """
472
    master = self.cfg.GetMasterNode()
473

    
474
    nodelist = self.cfg.GetNodeList()
475
    if len(nodelist) != 1 or nodelist[0] != master:
476
      raise errors.OpPrereqError("There are still %d node(s) in"
477
                                 " this cluster." % (len(nodelist) - 1))
478
    instancelist = self.cfg.GetInstanceList()
479
    if instancelist:
480
      raise errors.OpPrereqError("There are still %d instance(s) in"
481
                                 " this cluster." % len(instancelist))
482

    
483
  def Exec(self, feedback_fn):
484
    """Destroys the cluster.
485

486
    """
487
    master = self.cfg.GetMasterNode()
488
    if not self.rpc.call_node_stop_master(master, False):
489
      raise errors.OpExecError("Could not disable the master role")
490
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
491
    utils.CreateBackup(priv_key)
492
    utils.CreateBackup(pub_key)
493
    return master
494

    
495

    
496
class LUVerifyCluster(LogicalUnit):
497
  """Verifies the cluster status.
498

499
  """
500
  HPATH = "cluster-verify"
501
  HTYPE = constants.HTYPE_CLUSTER
502
  _OP_REQP = ["skip_checks"]
503
  REQ_BGL = False
504

    
505
  def ExpandNames(self):
506
    self.needed_locks = {
507
      locking.LEVEL_NODE: locking.ALL_SET,
508
      locking.LEVEL_INSTANCE: locking.ALL_SET,
509
    }
510
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
511

    
512
  def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
513
                  remote_version, feedback_fn):
514
    """Run multiple tests against a node.
515

516
    Test list:
517
      - compares ganeti version
518
      - checks vg existance and size > 20G
519
      - checks config file checksum
520
      - checks ssh to other nodes
521

522
    Args:
523
      node: name of the node to check
524
      file_list: required list of files
525
      local_cksum: dictionary of local files and their checksums
526

527
    """
528
    # compares ganeti version
529
    local_version = constants.PROTOCOL_VERSION
530
    if not remote_version:
531
      feedback_fn("  - ERROR: connection to %s failed" % (node))
532
      return True
533

    
534
    if local_version != remote_version:
535
      feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
536
                      (local_version, node, remote_version))
537
      return True
538

    
539
    # checks vg existance and size > 20G
540

    
541
    bad = False
542
    if not vglist:
543
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
544
                      (node,))
545
      bad = True
546
    else:
547
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
548
                                            constants.MIN_VG_SIZE)
549
      if vgstatus:
550
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
551
        bad = True
552

    
553
    if not node_result:
554
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
555
      return True
556

    
557
    # checks config file checksum
558
    # checks ssh to any
559

    
560
    if 'filelist' not in node_result:
561
      bad = True
562
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
563
    else:
564
      remote_cksum = node_result['filelist']
565
      for file_name in file_list:
566
        if file_name not in remote_cksum:
567
          bad = True
568
          feedback_fn("  - ERROR: file '%s' missing" % file_name)
569
        elif remote_cksum[file_name] != local_cksum[file_name]:
570
          bad = True
571
          feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
572

    
573
    if 'nodelist' not in node_result:
574
      bad = True
575
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
576
    else:
577
      if node_result['nodelist']:
578
        bad = True
579
        for node in node_result['nodelist']:
580
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
581
                          (node, node_result['nodelist'][node]))
582
    if 'node-net-test' not in node_result:
583
      bad = True
584
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
585
    else:
586
      if node_result['node-net-test']:
587
        bad = True
588
        nlist = utils.NiceSort(node_result['node-net-test'].keys())
589
        for node in nlist:
590
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
591
                          (node, node_result['node-net-test'][node]))
592

    
593
    hyp_result = node_result.get('hypervisor', None)
594
    if isinstance(hyp_result, dict):
595
      for hv_name, hv_result in hyp_result.iteritems():
596
        if hv_result is not None:
597
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
598
                      (hv_name, hv_result))
599
    return bad
600

    
601
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
602
                      node_instance, feedback_fn):
603
    """Verify an instance.
604

605
    This function checks to see if the required block devices are
606
    available on the instance's node.
607

608
    """
609
    bad = False
610

    
611
    node_current = instanceconfig.primary_node
612

    
613
    node_vol_should = {}
614
    instanceconfig.MapLVsByNode(node_vol_should)
615

    
616
    for node in node_vol_should:
617
      for volume in node_vol_should[node]:
618
        if node not in node_vol_is or volume not in node_vol_is[node]:
619
          feedback_fn("  - ERROR: volume %s missing on node %s" %
620
                          (volume, node))
621
          bad = True
622

    
623
    if not instanceconfig.status == 'down':
624
      if (node_current not in node_instance or
625
          not instance in node_instance[node_current]):
626
        feedback_fn("  - ERROR: instance %s not running on node %s" %
627
                        (instance, node_current))
628
        bad = True
629

    
630
    for node in node_instance:
631
      if (not node == node_current):
632
        if instance in node_instance[node]:
633
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
634
                          (instance, node))
635
          bad = True
636

    
637
    return bad
638

    
639
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
640
    """Verify if there are any unknown volumes in the cluster.
641

642
    The .os, .swap and backup volumes are ignored. All other volumes are
643
    reported as unknown.
644

645
    """
646
    bad = False
647

    
648
    for node in node_vol_is:
649
      for volume in node_vol_is[node]:
650
        if node not in node_vol_should or volume not in node_vol_should[node]:
651
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
652
                      (volume, node))
653
          bad = True
654
    return bad
655

    
656
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
657
    """Verify the list of running instances.
658

659
    This checks what instances are running but unknown to the cluster.
660

661
    """
662
    bad = False
663
    for node in node_instance:
664
      for runninginstance in node_instance[node]:
665
        if runninginstance not in instancelist:
666
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
667
                          (runninginstance, node))
668
          bad = True
669
    return bad
670

    
671
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
672
    """Verify N+1 Memory Resilience.
673

674
    Check that if one single node dies we can still start all the instances it
675
    was primary for.
676

677
    """
678
    bad = False
679

    
680
    for node, nodeinfo in node_info.iteritems():
681
      # This code checks that every node which is now listed as secondary has
682
      # enough memory to host all instances it is supposed to should a single
683
      # other node in the cluster fail.
684
      # FIXME: not ready for failover to an arbitrary node
685
      # FIXME: does not support file-backed instances
686
      # WARNING: we currently take into account down instances as well as up
687
      # ones, considering that even if they're down someone might want to start
688
      # them even in the event of a node failure.
689
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
690
        needed_mem = 0
691
        for instance in instances:
692
          needed_mem += instance_cfg[instance].memory
693
        if nodeinfo['mfree'] < needed_mem:
694
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
695
                      " failovers should node %s fail" % (node, prinode))
696
          bad = True
697
    return bad
698

    
699
  def CheckPrereq(self):
700
    """Check prerequisites.
701

702
    Transform the list of checks we're going to skip into a set and check that
703
    all its members are valid.
704

705
    """
706
    self.skip_set = frozenset(self.op.skip_checks)
707
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
708
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
709

    
710
  def BuildHooksEnv(self):
711
    """Build hooks env.
712

713
    Cluster-Verify hooks just rone in the post phase and their failure makes
714
    the output be logged in the verify output and the verification to fail.
715

716
    """
717
    all_nodes = self.cfg.GetNodeList()
718
    # TODO: populate the environment with useful information for verify hooks
719
    env = {}
720
    return env, [], all_nodes
721

    
722
  def Exec(self, feedback_fn):
723
    """Verify integrity of cluster, performing various test on nodes.
724

725
    """
726
    bad = False
727
    feedback_fn("* Verifying global settings")
728
    for msg in self.cfg.VerifyConfig():
729
      feedback_fn("  - ERROR: %s" % msg)
730

    
731
    vg_name = self.cfg.GetVGName()
732
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
733
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
734
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
735
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
736
    i_non_redundant = [] # Non redundant instances
737
    node_volume = {}
738
    node_instance = {}
739
    node_info = {}
740
    instance_cfg = {}
741

    
742
    # FIXME: verify OS list
743
    # do local checksums
744
    file_names = []
745
    file_names.append(constants.SSL_CERT_FILE)
746
    file_names.append(constants.CLUSTER_CONF_FILE)
747
    local_checksums = utils.FingerprintFiles(file_names)
748

    
749
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
750
    all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
751
    all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
752
    all_vglist = self.rpc.call_vg_list(nodelist)
753
    node_verify_param = {
754
      'filelist': file_names,
755
      'nodelist': nodelist,
756
      'hypervisor': hypervisors,
757
      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
758
                        for node in nodeinfo]
759
      }
760
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
761
                                           self.cfg.GetClusterName())
762
    all_rversion = self.rpc.call_version(nodelist)
763
    all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
764
                                        self.cfg.GetHypervisorType())
765

    
766
    for node in nodelist:
767
      feedback_fn("* Verifying node %s" % node)
768
      result = self._VerifyNode(node, file_names, local_checksums,
769
                                all_vglist[node], all_nvinfo[node],
770
                                all_rversion[node], feedback_fn)
771
      bad = bad or result
772

    
773
      # node_volume
774
      volumeinfo = all_volumeinfo[node]
775

    
776
      if isinstance(volumeinfo, basestring):
777
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
778
                    (node, volumeinfo[-400:].encode('string_escape')))
779
        bad = True
780
        node_volume[node] = {}
781
      elif not isinstance(volumeinfo, dict):
782
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
783
        bad = True
784
        continue
785
      else:
786
        node_volume[node] = volumeinfo
787

    
788
      # node_instance
789
      nodeinstance = all_instanceinfo[node]
790
      if type(nodeinstance) != list:
791
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
792
        bad = True
793
        continue
794

    
795
      node_instance[node] = nodeinstance
796

    
797
      # node_info
798
      nodeinfo = all_ninfo[node]
799
      if not isinstance(nodeinfo, dict):
800
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
801
        bad = True
802
        continue
803

    
804
      try:
805
        node_info[node] = {
806
          "mfree": int(nodeinfo['memory_free']),
807
          "dfree": int(nodeinfo['vg_free']),
808
          "pinst": [],
809
          "sinst": [],
810
          # dictionary holding all instances this node is secondary for,
811
          # grouped by their primary node. Each key is a cluster node, and each
812
          # value is a list of instances which have the key as primary and the
813
          # current node as secondary.  this is handy to calculate N+1 memory
814
          # availability if you can only failover from a primary to its
815
          # secondary.
816
          "sinst-by-pnode": {},
817
        }
818
      except ValueError:
819
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
820
        bad = True
821
        continue
822

    
823
    node_vol_should = {}
824

    
825
    for instance in instancelist:
826
      feedback_fn("* Verifying instance %s" % instance)
827
      inst_config = self.cfg.GetInstanceInfo(instance)
828
      result =  self._VerifyInstance(instance, inst_config, node_volume,
829
                                     node_instance, feedback_fn)
830
      bad = bad or result
831

    
832
      inst_config.MapLVsByNode(node_vol_should)
833

    
834
      instance_cfg[instance] = inst_config
835

    
836
      pnode = inst_config.primary_node
837
      if pnode in node_info:
838
        node_info[pnode]['pinst'].append(instance)
839
      else:
840
        feedback_fn("  - ERROR: instance %s, connection to primary node"
841
                    " %s failed" % (instance, pnode))
842
        bad = True
843

    
844
      # If the instance is non-redundant we cannot survive losing its primary
845
      # node, so we are not N+1 compliant. On the other hand we have no disk
846
      # templates with more than one secondary so that situation is not well
847
      # supported either.
848
      # FIXME: does not support file-backed instances
849
      if len(inst_config.secondary_nodes) == 0:
850
        i_non_redundant.append(instance)
851
      elif len(inst_config.secondary_nodes) > 1:
852
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
853
                    % instance)
854

    
855
      for snode in inst_config.secondary_nodes:
856
        if snode in node_info:
857
          node_info[snode]['sinst'].append(instance)
858
          if pnode not in node_info[snode]['sinst-by-pnode']:
859
            node_info[snode]['sinst-by-pnode'][pnode] = []
860
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
861
        else:
862
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
863
                      " %s failed" % (instance, snode))
864

    
865
    feedback_fn("* Verifying orphan volumes")
866
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
867
                                       feedback_fn)
868
    bad = bad or result
869

    
870
    feedback_fn("* Verifying remaining instances")
871
    result = self._VerifyOrphanInstances(instancelist, node_instance,
872
                                         feedback_fn)
873
    bad = bad or result
874

    
875
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
876
      feedback_fn("* Verifying N+1 Memory redundancy")
877
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
878
      bad = bad or result
879

    
880
    feedback_fn("* Other Notes")
881
    if i_non_redundant:
882
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
883
                  % len(i_non_redundant))
884

    
885
    return not bad
886

    
887
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
888
    """Analize the post-hooks' result, handle it, and send some
889
    nicely-formatted feedback back to the user.
890

891
    Args:
892
      phase: the hooks phase that has just been run
893
      hooks_results: the results of the multi-node hooks rpc call
894
      feedback_fn: function to send feedback back to the caller
895
      lu_result: previous Exec result
896

897
    """
898
    # We only really run POST phase hooks, and are only interested in
899
    # their results
900
    if phase == constants.HOOKS_PHASE_POST:
901
      # Used to change hooks' output to proper indentation
902
      indent_re = re.compile('^', re.M)
903
      feedback_fn("* Hooks Results")
904
      if not hooks_results:
905
        feedback_fn("  - ERROR: general communication failure")
906
        lu_result = 1
907
      else:
908
        for node_name in hooks_results:
909
          show_node_header = True
910
          res = hooks_results[node_name]
911
          if res is False or not isinstance(res, list):
912
            feedback_fn("    Communication failure")
913
            lu_result = 1
914
            continue
915
          for script, hkr, output in res:
916
            if hkr == constants.HKR_FAIL:
917
              # The node header is only shown once, if there are
918
              # failing hooks on that node
919
              if show_node_header:
920
                feedback_fn("  Node %s:" % node_name)
921
                show_node_header = False
922
              feedback_fn("    ERROR: Script %s failed, output:" % script)
923
              output = indent_re.sub('      ', output)
924
              feedback_fn("%s" % output)
925
              lu_result = 1
926

    
927
      return lu_result
928

    
929

    
930
class LUVerifyDisks(NoHooksLU):
931
  """Verifies the cluster disks status.
932

933
  """
934
  _OP_REQP = []
935
  REQ_BGL = False
936

    
937
  def ExpandNames(self):
938
    self.needed_locks = {
939
      locking.LEVEL_NODE: locking.ALL_SET,
940
      locking.LEVEL_INSTANCE: locking.ALL_SET,
941
    }
942
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
943

    
944
  def CheckPrereq(self):
945
    """Check prerequisites.
946

947
    This has no prerequisites.
948

949
    """
950
    pass
951

    
952
  def Exec(self, feedback_fn):
953
    """Verify integrity of cluster disks.
954

955
    """
956
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
957

    
958
    vg_name = self.cfg.GetVGName()
959
    nodes = utils.NiceSort(self.cfg.GetNodeList())
960
    instances = [self.cfg.GetInstanceInfo(name)
961
                 for name in self.cfg.GetInstanceList()]
962

    
963
    nv_dict = {}
964
    for inst in instances:
965
      inst_lvs = {}
966
      if (inst.status != "up" or
967
          inst.disk_template not in constants.DTS_NET_MIRROR):
968
        continue
969
      inst.MapLVsByNode(inst_lvs)
970
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
971
      for node, vol_list in inst_lvs.iteritems():
972
        for vol in vol_list:
973
          nv_dict[(node, vol)] = inst
974

    
975
    if not nv_dict:
976
      return result
977

    
978
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
979

    
980
    to_act = set()
981
    for node in nodes:
982
      # node_volume
983
      lvs = node_lvs[node]
984

    
985
      if isinstance(lvs, basestring):
986
        logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
987
        res_nlvm[node] = lvs
988
      elif not isinstance(lvs, dict):
989
        logger.Info("connection to node %s failed or invalid data returned" %
990
                    (node,))
991
        res_nodes.append(node)
992
        continue
993

    
994
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
995
        inst = nv_dict.pop((node, lv_name), None)
996
        if (not lv_online and inst is not None
997
            and inst.name not in res_instances):
998
          res_instances.append(inst.name)
999

    
1000
    # any leftover items in nv_dict are missing LVs, let's arrange the
1001
    # data better
1002
    for key, inst in nv_dict.iteritems():
1003
      if inst.name not in res_missing:
1004
        res_missing[inst.name] = []
1005
      res_missing[inst.name].append(key)
1006

    
1007
    return result
1008

    
1009

    
1010
class LURenameCluster(LogicalUnit):
1011
  """Rename the cluster.
1012

1013
  """
1014
  HPATH = "cluster-rename"
1015
  HTYPE = constants.HTYPE_CLUSTER
1016
  _OP_REQP = ["name"]
1017

    
1018
  def BuildHooksEnv(self):
1019
    """Build hooks env.
1020

1021
    """
1022
    env = {
1023
      "OP_TARGET": self.cfg.GetClusterName(),
1024
      "NEW_NAME": self.op.name,
1025
      }
1026
    mn = self.cfg.GetMasterNode()
1027
    return env, [mn], [mn]
1028

    
1029
  def CheckPrereq(self):
1030
    """Verify that the passed name is a valid one.
1031

1032
    """
1033
    hostname = utils.HostInfo(self.op.name)
1034

    
1035
    new_name = hostname.name
1036
    self.ip = new_ip = hostname.ip
1037
    old_name = self.cfg.GetClusterName()
1038
    old_ip = self.cfg.GetMasterIP()
1039
    if new_name == old_name and new_ip == old_ip:
1040
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1041
                                 " cluster has changed")
1042
    if new_ip != old_ip:
1043
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1044
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1045
                                   " reachable on the network. Aborting." %
1046
                                   new_ip)
1047

    
1048
    self.op.name = new_name
1049

    
1050
  def Exec(self, feedback_fn):
1051
    """Rename the cluster.
1052

1053
    """
1054
    clustername = self.op.name
1055
    ip = self.ip
1056

    
1057
    # shutdown the master IP
1058
    master = self.cfg.GetMasterNode()
1059
    if not self.rpc.call_node_stop_master(master, False):
1060
      raise errors.OpExecError("Could not disable the master role")
1061

    
1062
    try:
1063
      # modify the sstore
1064
      # TODO: sstore
1065
      ss.SetKey(ss.SS_MASTER_IP, ip)
1066
      ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1067

    
1068
      # Distribute updated ss config to all nodes
1069
      myself = self.cfg.GetNodeInfo(master)
1070
      dist_nodes = self.cfg.GetNodeList()
1071
      if myself.name in dist_nodes:
1072
        dist_nodes.remove(myself.name)
1073

    
1074
      logger.Debug("Copying updated ssconf data to all nodes")
1075
      for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1076
        fname = ss.KeyToFilename(keyname)
1077
        result = self.rpc.call_upload_file(dist_nodes, fname)
1078
        for to_node in dist_nodes:
1079
          if not result[to_node]:
1080
            logger.Error("copy of file %s to node %s failed" %
1081
                         (fname, to_node))
1082
    finally:
1083
      if not self.rpc.call_node_start_master(master, False):
1084
        logger.Error("Could not re-enable the master role on the master,"
1085
                     " please restart manually.")
1086

    
1087

    
1088
def _RecursiveCheckIfLVMBased(disk):
1089
  """Check if the given disk or its children are lvm-based.
1090

1091
  Args:
1092
    disk: ganeti.objects.Disk object
1093

1094
  Returns:
1095
    boolean indicating whether a LD_LV dev_type was found or not
1096

1097
  """
1098
  if disk.children:
1099
    for chdisk in disk.children:
1100
      if _RecursiveCheckIfLVMBased(chdisk):
1101
        return True
1102
  return disk.dev_type == constants.LD_LV
1103

    
1104

    
1105
class LUSetClusterParams(LogicalUnit):
1106
  """Change the parameters of the cluster.
1107

1108
  """
1109
  HPATH = "cluster-modify"
1110
  HTYPE = constants.HTYPE_CLUSTER
1111
  _OP_REQP = []
1112
  REQ_BGL = False
1113

    
1114
  def ExpandNames(self):
1115
    # FIXME: in the future maybe other cluster params won't require checking on
1116
    # all nodes to be modified.
1117
    self.needed_locks = {
1118
      locking.LEVEL_NODE: locking.ALL_SET,
1119
    }
1120
    self.share_locks[locking.LEVEL_NODE] = 1
1121

    
1122
  def BuildHooksEnv(self):
1123
    """Build hooks env.
1124

1125
    """
1126
    env = {
1127
      "OP_TARGET": self.cfg.GetClusterName(),
1128
      "NEW_VG_NAME": self.op.vg_name,
1129
      }
1130
    mn = self.cfg.GetMasterNode()
1131
    return env, [mn], [mn]
1132

    
1133
  def CheckPrereq(self):
1134
    """Check prerequisites.
1135

1136
    This checks whether the given params don't conflict and
1137
    if the given volume group is valid.
1138

1139
    """
1140
    # FIXME: This only works because there is only one parameter that can be
1141
    # changed or removed.
1142
    if not self.op.vg_name:
1143
      instances = self.cfg.GetAllInstancesInfo().values()
1144
      for inst in instances:
1145
        for disk in inst.disks:
1146
          if _RecursiveCheckIfLVMBased(disk):
1147
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1148
                                       " lvm-based instances exist")
1149

    
1150
    # if vg_name not None, checks given volume group on all nodes
1151
    if self.op.vg_name:
1152
      node_list = self.acquired_locks[locking.LEVEL_NODE]
1153
      vglist = self.rpc.call_vg_list(node_list)
1154
      for node in node_list:
1155
        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1156
                                              constants.MIN_VG_SIZE)
1157
        if vgstatus:
1158
          raise errors.OpPrereqError("Error on node '%s': %s" %
1159
                                     (node, vgstatus))
1160

    
1161
  def Exec(self, feedback_fn):
1162
    """Change the parameters of the cluster.
1163

1164
    """
1165
    if self.op.vg_name != self.cfg.GetVGName():
1166
      self.cfg.SetVGName(self.op.vg_name)
1167
    else:
1168
      feedback_fn("Cluster LVM configuration already in desired"
1169
                  " state, not changing")
1170

    
1171

    
1172
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1173
  """Sleep and poll for an instance's disk to sync.
1174

1175
  """
1176
  if not instance.disks:
1177
    return True
1178

    
1179
  if not oneshot:
1180
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1181

    
1182
  node = instance.primary_node
1183

    
1184
  for dev in instance.disks:
1185
    lu.cfg.SetDiskID(dev, node)
1186

    
1187
  retries = 0
1188
  while True:
1189
    max_time = 0
1190
    done = True
1191
    cumul_degraded = False
1192
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1193
    if not rstats:
1194
      lu.proc.LogWarning("Can't get any data from node %s" % node)
1195
      retries += 1
1196
      if retries >= 10:
1197
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1198
                                 " aborting." % node)
1199
      time.sleep(6)
1200
      continue
1201
    retries = 0
1202
    for i in range(len(rstats)):
1203
      mstat = rstats[i]
1204
      if mstat is None:
1205
        lu.proc.LogWarning("Can't compute data for node %s/%s" %
1206
                           (node, instance.disks[i].iv_name))
1207
        continue
1208
      # we ignore the ldisk parameter
1209
      perc_done, est_time, is_degraded, _ = mstat
1210
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1211
      if perc_done is not None:
1212
        done = False
1213
        if est_time is not None:
1214
          rem_time = "%d estimated seconds remaining" % est_time
1215
          max_time = est_time
1216
        else:
1217
          rem_time = "no time estimate"
1218
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1219
                        (instance.disks[i].iv_name, perc_done, rem_time))
1220
    if done or oneshot:
1221
      break
1222

    
1223
    time.sleep(min(60, max_time))
1224

    
1225
  if done:
1226
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1227
  return not cumul_degraded
1228

    
1229

    
1230
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1231
  """Check that mirrors are not degraded.
1232

1233
  The ldisk parameter, if True, will change the test from the
1234
  is_degraded attribute (which represents overall non-ok status for
1235
  the device(s)) to the ldisk (representing the local storage status).
1236

1237
  """
1238
  lu.cfg.SetDiskID(dev, node)
1239
  if ldisk:
1240
    idx = 6
1241
  else:
1242
    idx = 5
1243

    
1244
  result = True
1245
  if on_primary or dev.AssembleOnSecondary():
1246
    rstats = lu.rpc.call_blockdev_find(node, dev)
1247
    if not rstats:
1248
      logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1249
      result = False
1250
    else:
1251
      result = result and (not rstats[idx])
1252
  if dev.children:
1253
    for child in dev.children:
1254
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1255

    
1256
  return result
1257

    
1258

    
1259
class LUDiagnoseOS(NoHooksLU):
1260
  """Logical unit for OS diagnose/query.
1261

1262
  """
1263
  _OP_REQP = ["output_fields", "names"]
1264
  REQ_BGL = False
1265

    
1266
  def ExpandNames(self):
1267
    if self.op.names:
1268
      raise errors.OpPrereqError("Selective OS query not supported")
1269

    
1270
    self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1271
    _CheckOutputFields(static=[],
1272
                       dynamic=self.dynamic_fields,
1273
                       selected=self.op.output_fields)
1274

    
1275
    # Lock all nodes, in shared mode
1276
    self.needed_locks = {}
1277
    self.share_locks[locking.LEVEL_NODE] = 1
1278
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1279

    
1280
  def CheckPrereq(self):
1281
    """Check prerequisites.
1282

1283
    """
1284

    
1285
  @staticmethod
1286
  def _DiagnoseByOS(node_list, rlist):
1287
    """Remaps a per-node return list into an a per-os per-node dictionary
1288

1289
      Args:
1290
        node_list: a list with the names of all nodes
1291
        rlist: a map with node names as keys and OS objects as values
1292

1293
      Returns:
1294
        map: a map with osnames as keys and as value another map, with
1295
             nodes as
1296
             keys and list of OS objects as values
1297
             e.g. {"debian-etch": {"node1": [<object>,...],
1298
                                   "node2": [<object>,]}
1299
                  }
1300

1301
    """
1302
    all_os = {}
1303
    for node_name, nr in rlist.iteritems():
1304
      if not nr:
1305
        continue
1306
      for os_obj in nr:
1307
        if os_obj.name not in all_os:
1308
          # build a list of nodes for this os containing empty lists
1309
          # for each node in node_list
1310
          all_os[os_obj.name] = {}
1311
          for nname in node_list:
1312
            all_os[os_obj.name][nname] = []
1313
        all_os[os_obj.name][node_name].append(os_obj)
1314
    return all_os
1315

    
1316
  def Exec(self, feedback_fn):
1317
    """Compute the list of OSes.
1318

1319
    """
1320
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1321
    node_data = self.rpc.call_os_diagnose(node_list)
1322
    if node_data == False:
1323
      raise errors.OpExecError("Can't gather the list of OSes")
1324
    pol = self._DiagnoseByOS(node_list, node_data)
1325
    output = []
1326
    for os_name, os_data in pol.iteritems():
1327
      row = []
1328
      for field in self.op.output_fields:
1329
        if field == "name":
1330
          val = os_name
1331
        elif field == "valid":
1332
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1333
        elif field == "node_status":
1334
          val = {}
1335
          for node_name, nos_list in os_data.iteritems():
1336
            val[node_name] = [(v.status, v.path) for v in nos_list]
1337
        else:
1338
          raise errors.ParameterError(field)
1339
        row.append(val)
1340
      output.append(row)
1341

    
1342
    return output
1343

    
1344

    
1345
class LURemoveNode(LogicalUnit):
1346
  """Logical unit for removing a node.
1347

1348
  """
1349
  HPATH = "node-remove"
1350
  HTYPE = constants.HTYPE_NODE
1351
  _OP_REQP = ["node_name"]
1352

    
1353
  def BuildHooksEnv(self):
1354
    """Build hooks env.
1355

1356
    This doesn't run on the target node in the pre phase as a failed
1357
    node would then be impossible to remove.
1358

1359
    """
1360
    env = {
1361
      "OP_TARGET": self.op.node_name,
1362
      "NODE_NAME": self.op.node_name,
1363
      }
1364
    all_nodes = self.cfg.GetNodeList()
1365
    all_nodes.remove(self.op.node_name)
1366
    return env, all_nodes, all_nodes
1367

    
1368
  def CheckPrereq(self):
1369
    """Check prerequisites.
1370

1371
    This checks:
1372
     - the node exists in the configuration
1373
     - it does not have primary or secondary instances
1374
     - it's not the master
1375

1376
    Any errors are signalled by raising errors.OpPrereqError.
1377

1378
    """
1379
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1380
    if node is None:
1381
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1382

    
1383
    instance_list = self.cfg.GetInstanceList()
1384

    
1385
    masternode = self.cfg.GetMasterNode()
1386
    if node.name == masternode:
1387
      raise errors.OpPrereqError("Node is the master node,"
1388
                                 " you need to failover first.")
1389

    
1390
    for instance_name in instance_list:
1391
      instance = self.cfg.GetInstanceInfo(instance_name)
1392
      if node.name == instance.primary_node:
1393
        raise errors.OpPrereqError("Instance %s still running on the node,"
1394
                                   " please remove first." % instance_name)
1395
      if node.name in instance.secondary_nodes:
1396
        raise errors.OpPrereqError("Instance %s has node as a secondary,"
1397
                                   " please remove first." % instance_name)
1398
    self.op.node_name = node.name
1399
    self.node = node
1400

    
1401
  def Exec(self, feedback_fn):
1402
    """Removes the node from the cluster.
1403

1404
    """
1405
    node = self.node
1406
    logger.Info("stopping the node daemon and removing configs from node %s" %
1407
                node.name)
1408

    
1409
    self.context.RemoveNode(node.name)
1410

    
1411
    self.rpc.call_node_leave_cluster(node.name)
1412

    
1413

    
1414
class LUQueryNodes(NoHooksLU):
1415
  """Logical unit for querying nodes.
1416

1417
  """
1418
  _OP_REQP = ["output_fields", "names"]
1419
  REQ_BGL = False
1420

    
1421
  def ExpandNames(self):
1422
    self.dynamic_fields = frozenset([
1423
      "dtotal", "dfree",
1424
      "mtotal", "mnode", "mfree",
1425
      "bootid",
1426
      "ctotal",
1427
      ])
1428

    
1429
    self.static_fields = frozenset([
1430
      "name", "pinst_cnt", "sinst_cnt",
1431
      "pinst_list", "sinst_list",
1432
      "pip", "sip", "tags",
1433
      "serial_no",
1434
      ])
1435

    
1436
    _CheckOutputFields(static=self.static_fields,
1437
                       dynamic=self.dynamic_fields,
1438
                       selected=self.op.output_fields)
1439

    
1440
    self.needed_locks = {}
1441
    self.share_locks[locking.LEVEL_NODE] = 1
1442

    
1443
    if self.op.names:
1444
      self.wanted = _GetWantedNodes(self, self.op.names)
1445
    else:
1446
      self.wanted = locking.ALL_SET
1447

    
1448
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1449
    if self.do_locking:
1450
      # if we don't request only static fields, we need to lock the nodes
1451
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1452

    
1453

    
1454
  def CheckPrereq(self):
1455
    """Check prerequisites.
1456

1457
    """
1458
    # The validation of the node list is done in the _GetWantedNodes,
1459
    # if non empty, and if empty, there's no validation to do
1460
    pass
1461

    
1462
  def Exec(self, feedback_fn):
1463
    """Computes the list of nodes and their attributes.
1464

1465
    """
1466
    all_info = self.cfg.GetAllNodesInfo()
1467
    if self.do_locking:
1468
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1469
    elif self.wanted != locking.ALL_SET:
1470
      nodenames = self.wanted
1471
      missing = set(nodenames).difference(all_info.keys())
1472
      if missing:
1473
        raise errors.OpExecError(
1474
          "Some nodes were removed before retrieving their data: %s" % missing)
1475
    else:
1476
      nodenames = all_info.keys()
1477
    nodelist = [all_info[name] for name in nodenames]
1478

    
1479
    # begin data gathering
1480

    
1481
    if self.dynamic_fields.intersection(self.op.output_fields):
1482
      live_data = {}
1483
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1484
                                          self.cfg.GetHypervisorType())
1485
      for name in nodenames:
1486
        nodeinfo = node_data.get(name, None)
1487
        if nodeinfo:
1488
          live_data[name] = {
1489
            "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1490
            "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1491
            "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1492
            "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1493
            "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1494
            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1495
            "bootid": nodeinfo['bootid'],
1496
            }
1497
        else:
1498
          live_data[name] = {}
1499
    else:
1500
      live_data = dict.fromkeys(nodenames, {})
1501

    
1502
    node_to_primary = dict([(name, set()) for name in nodenames])
1503
    node_to_secondary = dict([(name, set()) for name in nodenames])
1504

    
1505
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1506
                             "sinst_cnt", "sinst_list"))
1507
    if inst_fields & frozenset(self.op.output_fields):
1508
      instancelist = self.cfg.GetInstanceList()
1509

    
1510
      for instance_name in instancelist:
1511
        inst = self.cfg.GetInstanceInfo(instance_name)
1512
        if inst.primary_node in node_to_primary:
1513
          node_to_primary[inst.primary_node].add(inst.name)
1514
        for secnode in inst.secondary_nodes:
1515
          if secnode in node_to_secondary:
1516
            node_to_secondary[secnode].add(inst.name)
1517

    
1518
    # end data gathering
1519

    
1520
    output = []
1521
    for node in nodelist:
1522
      node_output = []
1523
      for field in self.op.output_fields:
1524
        if field == "name":
1525
          val = node.name
1526
        elif field == "pinst_list":
1527
          val = list(node_to_primary[node.name])
1528
        elif field == "sinst_list":
1529
          val = list(node_to_secondary[node.name])
1530
        elif field == "pinst_cnt":
1531
          val = len(node_to_primary[node.name])
1532
        elif field == "sinst_cnt":
1533
          val = len(node_to_secondary[node.name])
1534
        elif field == "pip":
1535
          val = node.primary_ip
1536
        elif field == "sip":
1537
          val = node.secondary_ip
1538
        elif field == "tags":
1539
          val = list(node.GetTags())
1540
        elif field == "serial_no":
1541
          val = node.serial_no
1542
        elif field in self.dynamic_fields:
1543
          val = live_data[node.name].get(field, None)
1544
        else:
1545
          raise errors.ParameterError(field)
1546
        node_output.append(val)
1547
      output.append(node_output)
1548

    
1549
    return output
1550

    
1551

    
1552
class LUQueryNodeVolumes(NoHooksLU):
1553
  """Logical unit for getting volumes on node(s).
1554

1555
  """
1556
  _OP_REQP = ["nodes", "output_fields"]
1557
  REQ_BGL = False
1558

    
1559
  def ExpandNames(self):
1560
    _CheckOutputFields(static=["node"],
1561
                       dynamic=["phys", "vg", "name", "size", "instance"],
1562
                       selected=self.op.output_fields)
1563

    
1564
    self.needed_locks = {}
1565
    self.share_locks[locking.LEVEL_NODE] = 1
1566
    if not self.op.nodes:
1567
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1568
    else:
1569
      self.needed_locks[locking.LEVEL_NODE] = \
1570
        _GetWantedNodes(self, self.op.nodes)
1571

    
1572
  def CheckPrereq(self):
1573
    """Check prerequisites.
1574

1575
    This checks that the fields required are valid output fields.
1576

1577
    """
1578
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1579

    
1580
  def Exec(self, feedback_fn):
1581
    """Computes the list of nodes and their attributes.
1582

1583
    """
1584
    nodenames = self.nodes
1585
    volumes = self.rpc.call_node_volumes(nodenames)
1586

    
1587
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1588
             in self.cfg.GetInstanceList()]
1589

    
1590
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1591

    
1592
    output = []
1593
    for node in nodenames:
1594
      if node not in volumes or not volumes[node]:
1595
        continue
1596

    
1597
      node_vols = volumes[node][:]
1598
      node_vols.sort(key=lambda vol: vol['dev'])
1599

    
1600
      for vol in node_vols:
1601
        node_output = []
1602
        for field in self.op.output_fields:
1603
          if field == "node":
1604
            val = node
1605
          elif field == "phys":
1606
            val = vol['dev']
1607
          elif field == "vg":
1608
            val = vol['vg']
1609
          elif field == "name":
1610
            val = vol['name']
1611
          elif field == "size":
1612
            val = int(float(vol['size']))
1613
          elif field == "instance":
1614
            for inst in ilist:
1615
              if node not in lv_by_node[inst]:
1616
                continue
1617
              if vol['name'] in lv_by_node[inst][node]:
1618
                val = inst.name
1619
                break
1620
            else:
1621
              val = '-'
1622
          else:
1623
            raise errors.ParameterError(field)
1624
          node_output.append(str(val))
1625

    
1626
        output.append(node_output)
1627

    
1628
    return output
1629

    
1630

    
1631
class LUAddNode(LogicalUnit):
1632
  """Logical unit for adding node to the cluster.
1633

1634
  """
1635
  HPATH = "node-add"
1636
  HTYPE = constants.HTYPE_NODE
1637
  _OP_REQP = ["node_name"]
1638

    
1639
  def BuildHooksEnv(self):
1640
    """Build hooks env.
1641

1642
    This will run on all nodes before, and on all nodes + the new node after.
1643

1644
    """
1645
    env = {
1646
      "OP_TARGET": self.op.node_name,
1647
      "NODE_NAME": self.op.node_name,
1648
      "NODE_PIP": self.op.primary_ip,
1649
      "NODE_SIP": self.op.secondary_ip,
1650
      }
1651
    nodes_0 = self.cfg.GetNodeList()
1652
    nodes_1 = nodes_0 + [self.op.node_name, ]
1653
    return env, nodes_0, nodes_1
1654

    
1655
  def CheckPrereq(self):
1656
    """Check prerequisites.
1657

1658
    This checks:
1659
     - the new node is not already in the config
1660
     - it is resolvable
1661
     - its parameters (single/dual homed) matches the cluster
1662

1663
    Any errors are signalled by raising errors.OpPrereqError.
1664

1665
    """
1666
    node_name = self.op.node_name
1667
    cfg = self.cfg
1668

    
1669
    dns_data = utils.HostInfo(node_name)
1670

    
1671
    node = dns_data.name
1672
    primary_ip = self.op.primary_ip = dns_data.ip
1673
    secondary_ip = getattr(self.op, "secondary_ip", None)
1674
    if secondary_ip is None:
1675
      secondary_ip = primary_ip
1676
    if not utils.IsValidIP(secondary_ip):
1677
      raise errors.OpPrereqError("Invalid secondary IP given")
1678
    self.op.secondary_ip = secondary_ip
1679

    
1680
    node_list = cfg.GetNodeList()
1681
    if not self.op.readd and node in node_list:
1682
      raise errors.OpPrereqError("Node %s is already in the configuration" %
1683
                                 node)
1684
    elif self.op.readd and node not in node_list:
1685
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1686

    
1687
    for existing_node_name in node_list:
1688
      existing_node = cfg.GetNodeInfo(existing_node_name)
1689

    
1690
      if self.op.readd and node == existing_node_name:
1691
        if (existing_node.primary_ip != primary_ip or
1692
            existing_node.secondary_ip != secondary_ip):
1693
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
1694
                                     " address configuration as before")
1695
        continue
1696

    
1697
      if (existing_node.primary_ip == primary_ip or
1698
          existing_node.secondary_ip == primary_ip or
1699
          existing_node.primary_ip == secondary_ip or
1700
          existing_node.secondary_ip == secondary_ip):
1701
        raise errors.OpPrereqError("New node ip address(es) conflict with"
1702
                                   " existing node %s" % existing_node.name)
1703

    
1704
    # check that the type of the node (single versus dual homed) is the
1705
    # same as for the master
1706
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1707
    master_singlehomed = myself.secondary_ip == myself.primary_ip
1708
    newbie_singlehomed = secondary_ip == primary_ip
1709
    if master_singlehomed != newbie_singlehomed:
1710
      if master_singlehomed:
1711
        raise errors.OpPrereqError("The master has no private ip but the"
1712
                                   " new node has one")
1713
      else:
1714
        raise errors.OpPrereqError("The master has a private ip but the"
1715
                                   " new node doesn't have one")
1716

    
1717
    # checks reachablity
1718
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1719
      raise errors.OpPrereqError("Node not reachable by ping")
1720

    
1721
    if not newbie_singlehomed:
1722
      # check reachability from my secondary ip to newbie's secondary ip
1723
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1724
                           source=myself.secondary_ip):
1725
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1726
                                   " based ping to noded port")
1727

    
1728
    self.new_node = objects.Node(name=node,
1729
                                 primary_ip=primary_ip,
1730
                                 secondary_ip=secondary_ip)
1731

    
1732
  def Exec(self, feedback_fn):
1733
    """Adds the new node to the cluster.
1734

1735
    """
1736
    new_node = self.new_node
1737
    node = new_node.name
1738

    
1739
    # check connectivity
1740
    result = self.rpc.call_version([node])[node]
1741
    if result:
1742
      if constants.PROTOCOL_VERSION == result:
1743
        logger.Info("communication to node %s fine, sw version %s match" %
1744
                    (node, result))
1745
      else:
1746
        raise errors.OpExecError("Version mismatch master version %s,"
1747
                                 " node version %s" %
1748
                                 (constants.PROTOCOL_VERSION, result))
1749
    else:
1750
      raise errors.OpExecError("Cannot get version from the new node")
1751

    
1752
    # setup ssh on node
1753
    logger.Info("copy ssh key to node %s" % node)
1754
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1755
    keyarray = []
1756
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1757
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1758
                priv_key, pub_key]
1759

    
1760
    for i in keyfiles:
1761
      f = open(i, 'r')
1762
      try:
1763
        keyarray.append(f.read())
1764
      finally:
1765
        f.close()
1766

    
1767
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1768
                                    keyarray[2],
1769
                                    keyarray[3], keyarray[4], keyarray[5])
1770

    
1771
    if not result:
1772
      raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1773

    
1774
    # Add node to our /etc/hosts, and add key to known_hosts
1775
    utils.AddHostToEtcHosts(new_node.name)
1776

    
1777
    if new_node.secondary_ip != new_node.primary_ip:
1778
      if not self.rpc.call_node_has_ip_address(new_node.name,
1779
                                               new_node.secondary_ip):
1780
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1781
                                 " you gave (%s). Please fix and re-run this"
1782
                                 " command." % new_node.secondary_ip)
1783

    
1784
    node_verify_list = [self.cfg.GetMasterNode()]
1785
    node_verify_param = {
1786
      'nodelist': [node],
1787
      # TODO: do a node-net-test as well?
1788
    }
1789

    
1790
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1791
                                       self.cfg.GetClusterName())
1792
    for verifier in node_verify_list:
1793
      if not result[verifier]:
1794
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
1795
                                 " for remote verification" % verifier)
1796
      if result[verifier]['nodelist']:
1797
        for failed in result[verifier]['nodelist']:
1798
          feedback_fn("ssh/hostname verification failed %s -> %s" %
1799
                      (verifier, result[verifier]['nodelist'][failed]))
1800
        raise errors.OpExecError("ssh/hostname verification failed.")
1801

    
1802
    # Distribute updated /etc/hosts and known_hosts to all nodes,
1803
    # including the node just added
1804
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1805
    dist_nodes = self.cfg.GetNodeList()
1806
    if not self.op.readd:
1807
      dist_nodes.append(node)
1808
    if myself.name in dist_nodes:
1809
      dist_nodes.remove(myself.name)
1810

    
1811
    logger.Debug("Copying hosts and known_hosts to all nodes")
1812
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1813
      result = self.rpc.call_upload_file(dist_nodes, fname)
1814
      for to_node in dist_nodes:
1815
        if not result[to_node]:
1816
          logger.Error("copy of file %s to node %s failed" %
1817
                       (fname, to_node))
1818

    
1819
    to_copy = []
1820
    if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1821
      to_copy.append(constants.VNC_PASSWORD_FILE)
1822
    for fname in to_copy:
1823
      result = self.rpc.call_upload_file([node], fname)
1824
      if not result[node]:
1825
        logger.Error("could not copy file %s to node %s" % (fname, node))
1826

    
1827
    if self.op.readd:
1828
      self.context.ReaddNode(new_node)
1829
    else:
1830
      self.context.AddNode(new_node)
1831

    
1832

    
1833
class LUQueryClusterInfo(NoHooksLU):
1834
  """Query cluster configuration.
1835

1836
  """
1837
  _OP_REQP = []
1838
  REQ_MASTER = False
1839
  REQ_BGL = False
1840

    
1841
  def ExpandNames(self):
1842
    self.needed_locks = {}
1843

    
1844
  def CheckPrereq(self):
1845
    """No prerequsites needed for this LU.
1846

1847
    """
1848
    pass
1849

    
1850
  def Exec(self, feedback_fn):
1851
    """Return cluster config.
1852

1853
    """
1854
    result = {
1855
      "name": self.cfg.GetClusterName(),
1856
      "software_version": constants.RELEASE_VERSION,
1857
      "protocol_version": constants.PROTOCOL_VERSION,
1858
      "config_version": constants.CONFIG_VERSION,
1859
      "os_api_version": constants.OS_API_VERSION,
1860
      "export_version": constants.EXPORT_VERSION,
1861
      "master": self.cfg.GetMasterNode(),
1862
      "architecture": (platform.architecture()[0], platform.machine()),
1863
      "hypervisor_type": self.cfg.GetHypervisorType(),
1864
      "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1865
      }
1866

    
1867
    return result
1868

    
1869

    
1870
class LUQueryConfigValues(NoHooksLU):
1871
  """Return configuration values.
1872

1873
  """
1874
  _OP_REQP = []
1875
  REQ_BGL = False
1876

    
1877
  def ExpandNames(self):
1878
    self.needed_locks = {}
1879

    
1880
    static_fields = ["cluster_name", "master_node"]
1881
    _CheckOutputFields(static=static_fields,
1882
                       dynamic=[],
1883
                       selected=self.op.output_fields)
1884

    
1885
  def CheckPrereq(self):
1886
    """No prerequisites.
1887

1888
    """
1889
    pass
1890

    
1891
  def Exec(self, feedback_fn):
1892
    """Dump a representation of the cluster config to the standard output.
1893

1894
    """
1895
    values = []
1896
    for field in self.op.output_fields:
1897
      if field == "cluster_name":
1898
        values.append(self.cfg.GetClusterName())
1899
      elif field == "master_node":
1900
        values.append(self.cfg.GetMasterNode())
1901
      else:
1902
        raise errors.ParameterError(field)
1903
    return values
1904

    
1905

    
1906
class LUActivateInstanceDisks(NoHooksLU):
1907
  """Bring up an instance's disks.
1908

1909
  """
1910
  _OP_REQP = ["instance_name"]
1911
  REQ_BGL = False
1912

    
1913
  def ExpandNames(self):
1914
    self._ExpandAndLockInstance()
1915
    self.needed_locks[locking.LEVEL_NODE] = []
1916
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1917

    
1918
  def DeclareLocks(self, level):
1919
    if level == locking.LEVEL_NODE:
1920
      self._LockInstancesNodes()
1921

    
1922
  def CheckPrereq(self):
1923
    """Check prerequisites.
1924

1925
    This checks that the instance is in the cluster.
1926

1927
    """
1928
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1929
    assert self.instance is not None, \
1930
      "Cannot retrieve locked instance %s" % self.op.instance_name
1931

    
1932
  def Exec(self, feedback_fn):
1933
    """Activate the disks.
1934

1935
    """
1936
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1937
    if not disks_ok:
1938
      raise errors.OpExecError("Cannot activate block devices")
1939

    
1940
    return disks_info
1941

    
1942

    
1943
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1944
  """Prepare the block devices for an instance.
1945

1946
  This sets up the block devices on all nodes.
1947

1948
  Args:
1949
    instance: a ganeti.objects.Instance object
1950
    ignore_secondaries: if true, errors on secondary nodes won't result
1951
                        in an error return from the function
1952

1953
  Returns:
1954
    false if the operation failed
1955
    list of (host, instance_visible_name, node_visible_name) if the operation
1956
         suceeded with the mapping from node devices to instance devices
1957
  """
1958
  device_info = []
1959
  disks_ok = True
1960
  iname = instance.name
1961
  # With the two passes mechanism we try to reduce the window of
1962
  # opportunity for the race condition of switching DRBD to primary
1963
  # before handshaking occured, but we do not eliminate it
1964

    
1965
  # The proper fix would be to wait (with some limits) until the
1966
  # connection has been made and drbd transitions from WFConnection
1967
  # into any other network-connected state (Connected, SyncTarget,
1968
  # SyncSource, etc.)
1969

    
1970
  # 1st pass, assemble on all nodes in secondary mode
1971
  for inst_disk in instance.disks:
1972
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1973
      lu.cfg.SetDiskID(node_disk, node)
1974
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1975
      if not result:
1976
        logger.Error("could not prepare block device %s on node %s"
1977
                     " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1978
        if not ignore_secondaries:
1979
          disks_ok = False
1980

    
1981
  # FIXME: race condition on drbd migration to primary
1982

    
1983
  # 2nd pass, do only the primary node
1984
  for inst_disk in instance.disks:
1985
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1986
      if node != instance.primary_node:
1987
        continue
1988
      lu.cfg.SetDiskID(node_disk, node)
1989
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1990
      if not result:
1991
        logger.Error("could not prepare block device %s on node %s"
1992
                     " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1993
        disks_ok = False
1994
    device_info.append((instance.primary_node, inst_disk.iv_name, result))
1995

    
1996
  # leave the disks configured for the primary node
1997
  # this is a workaround that would be fixed better by
1998
  # improving the logical/physical id handling
1999
  for disk in instance.disks:
2000
    lu.cfg.SetDiskID(disk, instance.primary_node)
2001

    
2002
  return disks_ok, device_info
2003

    
2004

    
2005
def _StartInstanceDisks(lu, instance, force):
2006
  """Start the disks of an instance.
2007

2008
  """
2009
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2010
                                           ignore_secondaries=force)
2011
  if not disks_ok:
2012
    _ShutdownInstanceDisks(lu, instance)
2013
    if force is not None and not force:
2014
      logger.Error("If the message above refers to a secondary node,"
2015
                   " you can retry the operation using '--force'.")
2016
    raise errors.OpExecError("Disk consistency error")
2017

    
2018

    
2019
class LUDeactivateInstanceDisks(NoHooksLU):
2020
  """Shutdown an instance's disks.
2021

2022
  """
2023
  _OP_REQP = ["instance_name"]
2024
  REQ_BGL = False
2025

    
2026
  def ExpandNames(self):
2027
    self._ExpandAndLockInstance()
2028
    self.needed_locks[locking.LEVEL_NODE] = []
2029
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2030

    
2031
  def DeclareLocks(self, level):
2032
    if level == locking.LEVEL_NODE:
2033
      self._LockInstancesNodes()
2034

    
2035
  def CheckPrereq(self):
2036
    """Check prerequisites.
2037

2038
    This checks that the instance is in the cluster.
2039

2040
    """
2041
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2042
    assert self.instance is not None, \
2043
      "Cannot retrieve locked instance %s" % self.op.instance_name
2044

    
2045
  def Exec(self, feedback_fn):
2046
    """Deactivate the disks
2047

2048
    """
2049
    instance = self.instance
2050
    _SafeShutdownInstanceDisks(self, instance)
2051

    
2052

    
2053
def _SafeShutdownInstanceDisks(lu, instance):
2054
  """Shutdown block devices of an instance.
2055

2056
  This function checks if an instance is running, before calling
2057
  _ShutdownInstanceDisks.
2058

2059
  """
2060
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2061
                                      [instance.hypervisor])
2062
  ins_l = ins_l[instance.primary_node]
2063
  if not type(ins_l) is list:
2064
    raise errors.OpExecError("Can't contact node '%s'" %
2065
                             instance.primary_node)
2066

    
2067
  if instance.name in ins_l:
2068
    raise errors.OpExecError("Instance is running, can't shutdown"
2069
                             " block devices.")
2070

    
2071
  _ShutdownInstanceDisks(lu, instance)
2072

    
2073

    
2074
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2075
  """Shutdown block devices of an instance.
2076

2077
  This does the shutdown on all nodes of the instance.
2078

2079
  If the ignore_primary is false, errors on the primary node are
2080
  ignored.
2081

2082
  """
2083
  result = True
2084
  for disk in instance.disks:
2085
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2086
      lu.cfg.SetDiskID(top_disk, node)
2087
      if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2088
        logger.Error("could not shutdown block device %s on node %s" %
2089
                     (disk.iv_name, node))
2090
        if not ignore_primary or node != instance.primary_node:
2091
          result = False
2092
  return result
2093

    
2094

    
2095
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2096
  """Checks if a node has enough free memory.
2097

2098
  This function check if a given node has the needed amount of free
2099
  memory. In case the node has less memory or we cannot get the
2100
  information from the node, this function raise an OpPrereqError
2101
  exception.
2102

2103
  @type lu: C{LogicalUnit}
2104
  @param lu: a logical unit from which we get configuration data
2105
  @type node: C{str}
2106
  @param node: the node to check
2107
  @type reason: C{str}
2108
  @param reason: string to use in the error message
2109
  @type requested: C{int}
2110
  @param requested: the amount of memory in MiB to check for
2111
  @type hypervisor: C{str}
2112
  @param hypervisor: the hypervisor to ask for memory stats
2113
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2114
      we cannot check the node
2115

2116
  """
2117
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2118
  if not nodeinfo or not isinstance(nodeinfo, dict):
2119
    raise errors.OpPrereqError("Could not contact node %s for resource"
2120
                             " information" % (node,))
2121

    
2122
  free_mem = nodeinfo[node].get('memory_free')
2123
  if not isinstance(free_mem, int):
2124
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2125
                             " was '%s'" % (node, free_mem))
2126
  if requested > free_mem:
2127
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2128
                             " needed %s MiB, available %s MiB" %
2129
                             (node, reason, requested, free_mem))
2130

    
2131

    
2132
class LUStartupInstance(LogicalUnit):
2133
  """Starts an instance.
2134

2135
  """
2136
  HPATH = "instance-start"
2137
  HTYPE = constants.HTYPE_INSTANCE
2138
  _OP_REQP = ["instance_name", "force"]
2139
  REQ_BGL = False
2140

    
2141
  def ExpandNames(self):
2142
    self._ExpandAndLockInstance()
2143
    self.needed_locks[locking.LEVEL_NODE] = []
2144
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2145

    
2146
  def DeclareLocks(self, level):
2147
    if level == locking.LEVEL_NODE:
2148
      self._LockInstancesNodes()
2149

    
2150
  def BuildHooksEnv(self):
2151
    """Build hooks env.
2152

2153
    This runs on master, primary and secondary nodes of the instance.
2154

2155
    """
2156
    env = {
2157
      "FORCE": self.op.force,
2158
      }
2159
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2160
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2161
          list(self.instance.secondary_nodes))
2162
    return env, nl, nl
2163

    
2164
  def CheckPrereq(self):
2165
    """Check prerequisites.
2166

2167
    This checks that the instance is in the cluster.
2168

2169
    """
2170
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2171
    assert self.instance is not None, \
2172
      "Cannot retrieve locked instance %s" % self.op.instance_name
2173

    
2174
    # check bridges existance
2175
    _CheckInstanceBridgesExist(self, instance)
2176

    
2177
    _CheckNodeFreeMemory(self, instance.primary_node,
2178
                         "starting instance %s" % instance.name,
2179
                         instance.memory, instance.hypervisor)
2180

    
2181
  def Exec(self, feedback_fn):
2182
    """Start the instance.
2183

2184
    """
2185
    instance = self.instance
2186
    force = self.op.force
2187
    extra_args = getattr(self.op, "extra_args", "")
2188

    
2189
    self.cfg.MarkInstanceUp(instance.name)
2190

    
2191
    node_current = instance.primary_node
2192

    
2193
    _StartInstanceDisks(self, instance, force)
2194

    
2195
    if not self.rpc.call_instance_start(node_current, instance, extra_args):
2196
      _ShutdownInstanceDisks(self, instance)
2197
      raise errors.OpExecError("Could not start instance")
2198

    
2199

    
2200
class LURebootInstance(LogicalUnit):
2201
  """Reboot an instance.
2202

2203
  """
2204
  HPATH = "instance-reboot"
2205
  HTYPE = constants.HTYPE_INSTANCE
2206
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2207
  REQ_BGL = False
2208

    
2209
  def ExpandNames(self):
2210
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2211
                                   constants.INSTANCE_REBOOT_HARD,
2212
                                   constants.INSTANCE_REBOOT_FULL]:
2213
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2214
                                  (constants.INSTANCE_REBOOT_SOFT,
2215
                                   constants.INSTANCE_REBOOT_HARD,
2216
                                   constants.INSTANCE_REBOOT_FULL))
2217
    self._ExpandAndLockInstance()
2218
    self.needed_locks[locking.LEVEL_NODE] = []
2219
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2220

    
2221
  def DeclareLocks(self, level):
2222
    if level == locking.LEVEL_NODE:
2223
      primary_only = not constants.INSTANCE_REBOOT_FULL
2224
      self._LockInstancesNodes(primary_only=primary_only)
2225

    
2226
  def BuildHooksEnv(self):
2227
    """Build hooks env.
2228

2229
    This runs on master, primary and secondary nodes of the instance.
2230

2231
    """
2232
    env = {
2233
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2234
      }
2235
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2236
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2237
          list(self.instance.secondary_nodes))
2238
    return env, nl, nl
2239

    
2240
  def CheckPrereq(self):
2241
    """Check prerequisites.
2242

2243
    This checks that the instance is in the cluster.
2244

2245
    """
2246
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2247
    assert self.instance is not None, \
2248
      "Cannot retrieve locked instance %s" % self.op.instance_name
2249

    
2250
    # check bridges existance
2251
    _CheckInstanceBridgesExist(self, instance)
2252

    
2253
  def Exec(self, feedback_fn):
2254
    """Reboot the instance.
2255

2256
    """
2257
    instance = self.instance
2258
    ignore_secondaries = self.op.ignore_secondaries
2259
    reboot_type = self.op.reboot_type
2260
    extra_args = getattr(self.op, "extra_args", "")
2261

    
2262
    node_current = instance.primary_node
2263

    
2264
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2265
                       constants.INSTANCE_REBOOT_HARD]:
2266
      if not self.rpc.call_instance_reboot(node_current, instance,
2267
                                           reboot_type, extra_args):
2268
        raise errors.OpExecError("Could not reboot instance")
2269
    else:
2270
      if not self.rpc.call_instance_shutdown(node_current, instance):
2271
        raise errors.OpExecError("could not shutdown instance for full reboot")
2272
      _ShutdownInstanceDisks(self, instance)
2273
      _StartInstanceDisks(self, instance, ignore_secondaries)
2274
      if not self.rpc.call_instance_start(node_current, instance, extra_args):
2275
        _ShutdownInstanceDisks(self, instance)
2276
        raise errors.OpExecError("Could not start instance for full reboot")
2277

    
2278
    self.cfg.MarkInstanceUp(instance.name)
2279

    
2280

    
2281
class LUShutdownInstance(LogicalUnit):
2282
  """Shutdown an instance.
2283

2284
  """
2285
  HPATH = "instance-stop"
2286
  HTYPE = constants.HTYPE_INSTANCE
2287
  _OP_REQP = ["instance_name"]
2288
  REQ_BGL = False
2289

    
2290
  def ExpandNames(self):
2291
    self._ExpandAndLockInstance()
2292
    self.needed_locks[locking.LEVEL_NODE] = []
2293
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2294

    
2295
  def DeclareLocks(self, level):
2296
    if level == locking.LEVEL_NODE:
2297
      self._LockInstancesNodes()
2298

    
2299
  def BuildHooksEnv(self):
2300
    """Build hooks env.
2301

2302
    This runs on master, primary and secondary nodes of the instance.
2303

2304
    """
2305
    env = _BuildInstanceHookEnvByObject(self.instance)
2306
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2307
          list(self.instance.secondary_nodes))
2308
    return env, nl, nl
2309

    
2310
  def CheckPrereq(self):
2311
    """Check prerequisites.
2312

2313
    This checks that the instance is in the cluster.
2314

2315
    """
2316
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2317
    assert self.instance is not None, \
2318
      "Cannot retrieve locked instance %s" % self.op.instance_name
2319

    
2320
  def Exec(self, feedback_fn):
2321
    """Shutdown the instance.
2322

2323
    """
2324
    instance = self.instance
2325
    node_current = instance.primary_node
2326
    self.cfg.MarkInstanceDown(instance.name)
2327
    if not self.rpc.call_instance_shutdown(node_current, instance):
2328
      logger.Error("could not shutdown instance")
2329

    
2330
    _ShutdownInstanceDisks(self, instance)
2331

    
2332

    
2333
class LUReinstallInstance(LogicalUnit):
2334
  """Reinstall an instance.
2335

2336
  """
2337
  HPATH = "instance-reinstall"
2338
  HTYPE = constants.HTYPE_INSTANCE
2339
  _OP_REQP = ["instance_name"]
2340
  REQ_BGL = False
2341

    
2342
  def ExpandNames(self):
2343
    self._ExpandAndLockInstance()
2344
    self.needed_locks[locking.LEVEL_NODE] = []
2345
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2346

    
2347
  def DeclareLocks(self, level):
2348
    if level == locking.LEVEL_NODE:
2349
      self._LockInstancesNodes()
2350

    
2351
  def BuildHooksEnv(self):
2352
    """Build hooks env.
2353

2354
    This runs on master, primary and secondary nodes of the instance.
2355

2356
    """
2357
    env = _BuildInstanceHookEnvByObject(self.instance)
2358
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2359
          list(self.instance.secondary_nodes))
2360
    return env, nl, nl
2361

    
2362
  def CheckPrereq(self):
2363
    """Check prerequisites.
2364

2365
    This checks that the instance is in the cluster and is not running.
2366

2367
    """
2368
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2369
    assert instance is not None, \
2370
      "Cannot retrieve locked instance %s" % self.op.instance_name
2371

    
2372
    if instance.disk_template == constants.DT_DISKLESS:
2373
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2374
                                 self.op.instance_name)
2375
    if instance.status != "down":
2376
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2377
                                 self.op.instance_name)
2378
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2379
                                              instance.name,
2380
                                              instance.hypervisor)
2381
    if remote_info:
2382
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2383
                                 (self.op.instance_name,
2384
                                  instance.primary_node))
2385

    
2386
    self.op.os_type = getattr(self.op, "os_type", None)
2387
    if self.op.os_type is not None:
2388
      # OS verification
2389
      pnode = self.cfg.GetNodeInfo(
2390
        self.cfg.ExpandNodeName(instance.primary_node))
2391
      if pnode is None:
2392
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2393
                                   self.op.pnode)
2394
      os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2395
      if not os_obj:
2396
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2397
                                   " primary node"  % self.op.os_type)
2398

    
2399
    self.instance = instance
2400

    
2401
  def Exec(self, feedback_fn):
2402
    """Reinstall the instance.
2403

2404
    """
2405
    inst = self.instance
2406

    
2407
    if self.op.os_type is not None:
2408
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2409
      inst.os = self.op.os_type
2410
      self.cfg.Update(inst)
2411

    
2412
    _StartInstanceDisks(self, inst, None)
2413
    try:
2414
      feedback_fn("Running the instance OS create scripts...")
2415
      if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2416
                                           "sda", "sdb"):
2417
        raise errors.OpExecError("Could not install OS for instance %s"
2418
                                 " on node %s" %
2419
                                 (inst.name, inst.primary_node))
2420
    finally:
2421
      _ShutdownInstanceDisks(self, inst)
2422

    
2423

    
2424
class LURenameInstance(LogicalUnit):
2425
  """Rename an instance.
2426

2427
  """
2428
  HPATH = "instance-rename"
2429
  HTYPE = constants.HTYPE_INSTANCE
2430
  _OP_REQP = ["instance_name", "new_name"]
2431

    
2432
  def BuildHooksEnv(self):
2433
    """Build hooks env.
2434

2435
    This runs on master, primary and secondary nodes of the instance.
2436

2437
    """
2438
    env = _BuildInstanceHookEnvByObject(self.instance)
2439
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2440
    nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2441
          list(self.instance.secondary_nodes))
2442
    return env, nl, nl
2443

    
2444
  def CheckPrereq(self):
2445
    """Check prerequisites.
2446

2447
    This checks that the instance is in the cluster and is not running.
2448

2449
    """
2450
    instance = self.cfg.GetInstanceInfo(
2451
      self.cfg.ExpandInstanceName(self.op.instance_name))
2452
    if instance is None:
2453
      raise errors.OpPrereqError("Instance '%s' not known" %
2454
                                 self.op.instance_name)
2455
    if instance.status != "down":
2456
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2457
                                 self.op.instance_name)
2458
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2459
                                              instance.name,
2460
                                              instance.hypervisor)
2461
    if remote_info:
2462
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2463
                                 (self.op.instance_name,
2464
                                  instance.primary_node))
2465
    self.instance = instance
2466

    
2467
    # new name verification
2468
    name_info = utils.HostInfo(self.op.new_name)
2469

    
2470
    self.op.new_name = new_name = name_info.name
2471
    instance_list = self.cfg.GetInstanceList()
2472
    if new_name in instance_list:
2473
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2474
                                 new_name)
2475

    
2476
    if not getattr(self.op, "ignore_ip", False):
2477
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2478
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
2479
                                   (name_info.ip, new_name))
2480

    
2481

    
2482
  def Exec(self, feedback_fn):
2483
    """Reinstall the instance.
2484

2485
    """
2486
    inst = self.instance
2487
    old_name = inst.name
2488

    
2489
    if inst.disk_template == constants.DT_FILE:
2490
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2491

    
2492
    self.cfg.RenameInstance(inst.name, self.op.new_name)
2493
    # Change the instance lock. This is definitely safe while we hold the BGL
2494
    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2495
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2496

    
2497
    # re-read the instance from the configuration after rename
2498
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
2499

    
2500
    if inst.disk_template == constants.DT_FILE:
2501
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2502
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2503
                                                     old_file_storage_dir,
2504
                                                     new_file_storage_dir)
2505

    
2506
      if not result:
2507
        raise errors.OpExecError("Could not connect to node '%s' to rename"
2508
                                 " directory '%s' to '%s' (but the instance"
2509
                                 " has been renamed in Ganeti)" % (
2510
                                 inst.primary_node, old_file_storage_dir,
2511
                                 new_file_storage_dir))
2512

    
2513
      if not result[0]:
2514
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2515
                                 " (but the instance has been renamed in"
2516
                                 " Ganeti)" % (old_file_storage_dir,
2517
                                               new_file_storage_dir))
2518

    
2519
    _StartInstanceDisks(self, inst, None)
2520
    try:
2521
      if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2522
                                               old_name,
2523
                                               "sda", "sdb"):
2524
        msg = ("Could not run OS rename script for instance %s on node %s"
2525
               " (but the instance has been renamed in Ganeti)" %
2526
               (inst.name, inst.primary_node))
2527
        logger.Error(msg)
2528
    finally:
2529
      _ShutdownInstanceDisks(self, inst)
2530

    
2531

    
2532
class LURemoveInstance(LogicalUnit):
2533
  """Remove an instance.
2534

2535
  """
2536
  HPATH = "instance-remove"
2537
  HTYPE = constants.HTYPE_INSTANCE
2538
  _OP_REQP = ["instance_name", "ignore_failures"]
2539
  REQ_BGL = False
2540

    
2541
  def ExpandNames(self):
2542
    self._ExpandAndLockInstance()
2543
    self.needed_locks[locking.LEVEL_NODE] = []
2544
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2545

    
2546
  def DeclareLocks(self, level):
2547
    if level == locking.LEVEL_NODE:
2548
      self._LockInstancesNodes()
2549

    
2550
  def BuildHooksEnv(self):
2551
    """Build hooks env.
2552

2553
    This runs on master, primary and secondary nodes of the instance.
2554

2555
    """
2556
    env = _BuildInstanceHookEnvByObject(self.instance)
2557
    nl = [self.cfg.GetMasterNode()]
2558
    return env, nl, nl
2559

    
2560
  def CheckPrereq(self):
2561
    """Check prerequisites.
2562

2563
    This checks that the instance is in the cluster.
2564

2565
    """
2566
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2567
    assert self.instance is not None, \
2568
      "Cannot retrieve locked instance %s" % self.op.instance_name
2569

    
2570
  def Exec(self, feedback_fn):
2571
    """Remove the instance.
2572

2573
    """
2574
    instance = self.instance
2575
    logger.Info("shutting down instance %s on node %s" %
2576
                (instance.name, instance.primary_node))
2577

    
2578
    if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2579
      if self.op.ignore_failures:
2580
        feedback_fn("Warning: can't shutdown instance")
2581
      else:
2582
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2583
                                 (instance.name, instance.primary_node))
2584

    
2585
    logger.Info("removing block devices for instance %s" % instance.name)
2586

    
2587
    if not _RemoveDisks(self, instance):
2588
      if self.op.ignore_failures:
2589
        feedback_fn("Warning: can't remove instance's disks")
2590
      else:
2591
        raise errors.OpExecError("Can't remove instance's disks")
2592

    
2593
    logger.Info("removing instance %s out of cluster config" % instance.name)
2594

    
2595
    self.cfg.RemoveInstance(instance.name)
2596
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2597

    
2598

    
2599
class LUQueryInstances(NoHooksLU):
2600
  """Logical unit for querying instances.
2601

2602
  """
2603
  _OP_REQP = ["output_fields", "names"]
2604
  REQ_BGL = False
2605

    
2606
  def ExpandNames(self):
2607
    self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2608
    self.static_fields = frozenset([
2609
      "name", "os", "pnode", "snodes",
2610
      "admin_state", "admin_ram",
2611
      "disk_template", "ip", "mac", "bridge",
2612
      "sda_size", "sdb_size", "vcpus", "tags",
2613
      "network_port",
2614
      "serial_no", "hypervisor", "hvparams",
2615
      ] + ["hv/%s" % name for name in constants.HVS_PARAMETERS])
2616
    _CheckOutputFields(static=self.static_fields,
2617
                       dynamic=self.dynamic_fields,
2618
                       selected=self.op.output_fields)
2619

    
2620
    self.needed_locks = {}
2621
    self.share_locks[locking.LEVEL_INSTANCE] = 1
2622
    self.share_locks[locking.LEVEL_NODE] = 1
2623

    
2624
    if self.op.names:
2625
      self.wanted = _GetWantedInstances(self, self.op.names)
2626
    else:
2627
      self.wanted = locking.ALL_SET
2628

    
2629
    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2630
    if self.do_locking:
2631
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2632
      self.needed_locks[locking.LEVEL_NODE] = []
2633
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2634

    
2635
  def DeclareLocks(self, level):
2636
    if level == locking.LEVEL_NODE and self.do_locking:
2637
      self._LockInstancesNodes()
2638

    
2639
  def CheckPrereq(self):
2640
    """Check prerequisites.
2641

2642
    """
2643
    pass
2644

    
2645
  def Exec(self, feedback_fn):
2646
    """Computes the list of nodes and their attributes.
2647

2648
    """
2649
    all_info = self.cfg.GetAllInstancesInfo()
2650
    if self.do_locking:
2651
      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2652
    elif self.wanted != locking.ALL_SET:
2653
      instance_names = self.wanted
2654
      missing = set(instance_names).difference(all_info.keys())
2655
      if missing:
2656
        raise errors.OpExecError(
2657
          "Some instances were removed before retrieving their data: %s"
2658
          % missing)
2659
    else:
2660
      instance_names = all_info.keys()
2661
    instance_list = [all_info[iname] for iname in instance_names]
2662

    
2663
    # begin data gathering
2664

    
2665
    nodes = frozenset([inst.primary_node for inst in instance_list])
2666
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
2667

    
2668
    bad_nodes = []
2669
    if self.dynamic_fields.intersection(self.op.output_fields):
2670
      live_data = {}
2671
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2672
      for name in nodes:
2673
        result = node_data[name]
2674
        if result:
2675
          live_data.update(result)
2676
        elif result == False:
2677
          bad_nodes.append(name)
2678
        # else no instance is alive
2679
    else:
2680
      live_data = dict([(name, {}) for name in instance_names])
2681

    
2682
    # end data gathering
2683

    
2684
    HVPREFIX = "hv/"
2685
    output = []
2686
    for instance in instance_list:
2687
      iout = []
2688
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2689
      for field in self.op.output_fields:
2690
        if field == "name":
2691
          val = instance.name
2692
        elif field == "os":
2693
          val = instance.os
2694
        elif field == "pnode":
2695
          val = instance.primary_node
2696
        elif field == "snodes":
2697
          val = list(instance.secondary_nodes)
2698
        elif field == "admin_state":
2699
          val = (instance.status != "down")
2700
        elif field == "oper_state":
2701
          if instance.primary_node in bad_nodes:
2702
            val = None
2703
          else:
2704
            val = bool(live_data.get(instance.name))
2705
        elif field == "status":
2706
          if instance.primary_node in bad_nodes:
2707
            val = "ERROR_nodedown"
2708
          else:
2709
            running = bool(live_data.get(instance.name))
2710
            if running:
2711
              if instance.status != "down":
2712
                val = "running"
2713
              else:
2714
                val = "ERROR_up"
2715
            else:
2716
              if instance.status != "down":
2717
                val = "ERROR_down"
2718
              else:
2719
                val = "ADMIN_down"
2720
        elif field == "admin_ram":
2721
          val = instance.memory
2722
        elif field == "oper_ram":
2723
          if instance.primary_node in bad_nodes:
2724
            val = None
2725
          elif instance.name in live_data:
2726
            val = live_data[instance.name].get("memory", "?")
2727
          else:
2728
            val = "-"
2729
        elif field == "disk_template":
2730
          val = instance.disk_template
2731
        elif field == "ip":
2732
          val = instance.nics[0].ip
2733
        elif field == "bridge":
2734
          val = instance.nics[0].bridge
2735
        elif field == "mac":
2736
          val = instance.nics[0].mac
2737
        elif field == "sda_size" or field == "sdb_size":
2738
          disk = instance.FindDisk(field[:3])
2739
          if disk is None:
2740
            val = None
2741
          else:
2742
            val = disk.size
2743
        elif field == "vcpus":
2744
          val = instance.vcpus
2745
        elif field == "tags":
2746
          val = list(instance.GetTags())
2747
        elif field == "serial_no":
2748
          val = instance.serial_no
2749
        elif field == "network_port":
2750
          val = instance.network_port
2751
        elif (field.startswith(HVPREFIX) and
2752
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2753
          val = i_hv.get(field[len(HVPREFIX):], None)
2754
        elif field == "hvparams":
2755
          val = i_hv
2756
        elif field == "hypervisor":
2757
          val = instance.hypervisor
2758
        else:
2759
          raise errors.ParameterError(field)
2760
        iout.append(val)
2761
      output.append(iout)
2762

    
2763
    return output
2764

    
2765

    
2766
class LUFailoverInstance(LogicalUnit):
2767
  """Failover an instance.
2768

2769
  """
2770
  HPATH = "instance-failover"
2771
  HTYPE = constants.HTYPE_INSTANCE
2772
  _OP_REQP = ["instance_name", "ignore_consistency"]
2773
  REQ_BGL = False
2774

    
2775
  def ExpandNames(self):
2776
    self._ExpandAndLockInstance()
2777
    self.needed_locks[locking.LEVEL_NODE] = []
2778
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2779

    
2780
  def DeclareLocks(self, level):
2781
    if level == locking.LEVEL_NODE:
2782
      self._LockInstancesNodes()
2783

    
2784
  def BuildHooksEnv(self):
2785
    """Build hooks env.
2786

2787
    This runs on master, primary and secondary nodes of the instance.
2788

2789
    """
2790
    env = {
2791
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2792
      }
2793
    env.update(_BuildInstanceHookEnvByObject(self.instance))
2794
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2795
    return env, nl, nl
2796

    
2797
  def CheckPrereq(self):
2798
    """Check prerequisites.
2799

2800
    This checks that the instance is in the cluster.
2801

2802
    """
2803
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2804
    assert self.instance is not None, \
2805
      "Cannot retrieve locked instance %s" % self.op.instance_name
2806

    
2807
    if instance.disk_template not in constants.DTS_NET_MIRROR:
2808
      raise errors.OpPrereqError("Instance's disk layout is not"
2809
                                 " network mirrored, cannot failover.")
2810

    
2811
    secondary_nodes = instance.secondary_nodes
2812
    if not secondary_nodes:
2813
      raise errors.ProgrammerError("no secondary node but using "
2814
                                   "a mirrored disk template")
2815

    
2816
    target_node = secondary_nodes[0]
2817
    # check memory requirements on the secondary node
2818
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2819
                         instance.name, instance.memory,
2820
                         instance.hypervisor)
2821

    
2822
    # check bridge existance
2823
    brlist = [nic.bridge for nic in instance.nics]
2824
    if not self.rpc.call_bridges_exist(target_node, brlist):
2825
      raise errors.OpPrereqError("One or more target bridges %s does not"
2826
                                 " exist on destination node '%s'" %
2827
                                 (brlist, target_node))
2828

    
2829
  def Exec(self, feedback_fn):
2830
    """Failover an instance.
2831

2832
    The failover is done by shutting it down on its present node and
2833
    starting it on the secondary.
2834

2835
    """
2836
    instance = self.instance
2837

    
2838
    source_node = instance.primary_node
2839
    target_node = instance.secondary_nodes[0]
2840

    
2841
    feedback_fn("* checking disk consistency between source and target")
2842
    for dev in instance.disks:
2843
      # for drbd, these are drbd over lvm
2844
      if not _CheckDiskConsistency(self, dev, target_node, False):
2845
        if instance.status == "up" and not self.op.ignore_consistency:
2846
          raise errors.OpExecError("Disk %s is degraded on target node,"
2847
                                   " aborting failover." % dev.iv_name)
2848

    
2849
    feedback_fn("* shutting down instance on source node")
2850
    logger.Info("Shutting down instance %s on node %s" %
2851
                (instance.name, source_node))
2852

    
2853
    if not self.rpc.call_instance_shutdown(source_node, instance):
2854
      if self.op.ignore_consistency:
2855
        logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2856
                     " anyway. Please make sure node %s is down"  %
2857
                     (instance.name, source_node, source_node))
2858
      else:
2859
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2860
                                 (instance.name, source_node))
2861

    
2862
    feedback_fn("* deactivating the instance's disks on source node")
2863
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2864
      raise errors.OpExecError("Can't shut down the instance's disks.")
2865

    
2866
    instance.primary_node = target_node
2867
    # distribute new instance config to the other nodes
2868
    self.cfg.Update(instance)
2869

    
2870
    # Only start the instance if it's marked as up
2871
    if instance.status == "up":
2872
      feedback_fn("* activating the instance's disks on target node")
2873
      logger.Info("Starting instance %s on node %s" %
2874
                  (instance.name, target_node))
2875

    
2876
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2877
                                               ignore_secondaries=True)
2878
      if not disks_ok:
2879
        _ShutdownInstanceDisks(self, instance)
2880
        raise errors.OpExecError("Can't activate the instance's disks")
2881

    
2882
      feedback_fn("* starting the instance on the target node")
2883
      if not self.rpc.call_instance_start(target_node, instance, None):
2884
        _ShutdownInstanceDisks(self, instance)
2885
        raise errors.OpExecError("Could not start instance %s on node %s." %
2886
                                 (instance.name, target_node))
2887

    
2888

    
2889
def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2890
  """Create a tree of block devices on the primary node.
2891

2892
  This always creates all devices.
2893

2894
  """
2895
  if device.children:
2896
    for child in device.children:
2897
      if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2898
        return False
2899

    
2900
  lu.cfg.SetDiskID(device, node)
2901
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2902
                                       instance.name, True, info)
2903
  if not new_id:
2904
    return False
2905
  if device.physical_id is None:
2906
    device.physical_id = new_id
2907
  return True
2908

    
2909

    
2910
def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2911
  """Create a tree of block devices on a secondary node.
2912

2913
  If this device type has to be created on secondaries, create it and
2914
  all its children.
2915

2916
  If not, just recurse to children keeping the same 'force' value.
2917

2918
  """
2919
  if device.CreateOnSecondary():
2920
    force = True
2921
  if device.children:
2922
    for child in device.children:
2923
      if not _CreateBlockDevOnSecondary(lu, node, instance,
2924
                                        child, force, info):
2925
        return False
2926

    
2927
  if not force:
2928
    return True
2929
  lu.cfg.SetDiskID(device, node)
2930
  new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2931
                                       instance.name, False, info)
2932
  if not new_id:
2933
    return False
2934
  if device.physical_id is None:
2935
    device.physical_id = new_id
2936
  return True
2937

    
2938

    
2939
def _GenerateUniqueNames(lu, exts):
2940
  """Generate a suitable LV name.
2941

2942
  This will generate a logical volume name for the given instance.
2943

2944
  """
2945
  results = []
2946
  for val in exts:
2947
    new_id = lu.cfg.GenerateUniqueID()
2948
    results.append("%s%s" % (new_id, val))
2949
  return results
2950

    
2951

    
2952
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2953
                         p_minor, s_minor):
2954
  """Generate a drbd8 device complete with its children.
2955

2956
  """
2957
  port = lu.cfg.AllocatePort()
2958
  vgname = lu.cfg.GetVGName()
2959
  shared_secret = lu.cfg.GenerateDRBDSecret()
2960
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2961
                          logical_id=(vgname, names[0]))
2962
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2963
                          logical_id=(vgname, names[1]))
2964
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2965
                          logical_id=(primary, secondary, port,
2966
                                      p_minor, s_minor,
2967
                                      shared_secret),
2968
                          children=[dev_data, dev_meta],
2969
                          iv_name=iv_name)
2970
  return drbd_dev
2971

    
2972

    
2973
def _GenerateDiskTemplate(lu, template_name,
2974
                          instance_name, primary_node,
2975
                          secondary_nodes, disk_sz, swap_sz,
2976
                          file_storage_dir, file_driver):
2977
  """Generate the entire disk layout for a given template type.
2978

2979
  """
2980
  #TODO: compute space requirements
2981

    
2982
  vgname = lu.cfg.GetVGName()
2983
  if template_name == constants.DT_DISKLESS:
2984
    disks = []
2985
  elif template_name == constants.DT_PLAIN:
2986
    if len(secondary_nodes) != 0:
2987
      raise errors.ProgrammerError("Wrong template configuration")
2988

    
2989
    names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2990
    sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2991
                           logical_id=(vgname, names[0]),
2992
                           iv_name = "sda")
2993
    sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2994
                           logical_id=(vgname, names[1]),
2995
                           iv_name = "sdb")
2996
    disks = [sda_dev, sdb_dev]
2997
  elif template_name == constants.DT_DRBD8:
2998
    if len(secondary_nodes) != 1:
2999
      raise errors.ProgrammerError("Wrong template configuration")
3000
    remote_node = secondary_nodes[0]
3001
    (minor_pa, minor_pb,
3002
     minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3003
      [primary_node, primary_node, remote_node, remote_node], instance_name)
3004

    
3005
    names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3006
                                      ".sdb_data", ".sdb_meta"])
3007
    drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3008
                                        disk_sz, names[0:2], "sda",
3009
                                        minor_pa, minor_sa)
3010
    drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3011
                                        swap_sz, names[2:4], "sdb",
3012
                                        minor_pb, minor_sb)
3013
    disks = [drbd_sda_dev, drbd_sdb_dev]
3014
  elif template_name == constants.DT_FILE:
3015
    if len(secondary_nodes) != 0:
3016
      raise errors.ProgrammerError("Wrong template configuration")
3017

    
3018
    file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3019
                                iv_name="sda", logical_id=(file_driver,
3020
                                "%s/sda" % file_storage_dir))
3021
    file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3022
                                iv_name="sdb", logical_id=(file_driver,
3023
                                "%s/sdb" % file_storage_dir))
3024
    disks = [file_sda_dev, file_sdb_dev]
3025
  else:
3026
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3027
  return disks
3028

    
3029

    
3030
def _GetInstanceInfoText(instance):
3031
  """Compute that text that should be added to the disk's metadata.
3032

3033
  """
3034
  return "originstname+%s" % instance.name
3035

    
3036

    
3037
def _CreateDisks(lu, instance):
3038
  """Create all disks for an instance.
3039

3040
  This abstracts away some work from AddInstance.
3041

3042
  Args:
3043
    instance: the instance object
3044

3045
  Returns:
3046
    True or False showing the success of the creation process
3047

3048
  """
3049
  info = _GetInstanceInfoText(instance)
3050

    
3051
  if instance.disk_template == constants.DT_FILE:
3052
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3053
    result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3054
                                                 file_storage_dir)
3055

    
3056
    if not result:
3057
      logger.Error("Could not connect to node '%s'" % instance.primary_node)
3058
      return False
3059

    
3060
    if not result[0]:
3061
      logger.Error("failed to create directory '%s'" % file_storage_dir)
3062
      return False
3063

    
3064
  for device in instance.disks:
3065
    logger.Info("creating volume %s for instance %s" %
3066
                (device.iv_name, instance.name))
3067
    #HARDCODE
3068
    for secondary_node in instance.secondary_nodes:
3069
      if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3070
                                        device, False, info):
3071
        logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3072
                     (device.iv_name, device, secondary_node))
3073
        return False
3074
    #HARDCODE
3075
    if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3076
                                    instance, device, info):
3077
      logger.Error("failed to create volume %s on primary!" %
3078
                   device.iv_name)
3079
      return False
3080

    
3081
  return True
3082

    
3083

    
3084
def _RemoveDisks(lu, instance):
3085
  """Remove all disks for an instance.
3086

3087
  This abstracts away some work from `AddInstance()` and
3088
  `RemoveInstance()`. Note that in case some of the devices couldn't
3089
  be removed, the removal will continue with the other ones (compare
3090
  with `_CreateDisks()`).
3091

3092
  Args:
3093
    instance: the instance object
3094

3095
  Returns:
3096
    True or False showing the success of the removal proces
3097

3098
  """
3099
  logger.Info("removing block devices for instance %s" % instance.name)
3100

    
3101
  result = True
3102
  for device in instance.disks:
3103
    for node, disk in device.ComputeNodeTree(instance.primary_node):
3104
      lu.cfg.SetDiskID(disk, node)
3105
      if not lu.rpc.call_blockdev_remove(node, disk):
3106
        logger.Error("could not remove block device %s on node %s,"
3107
                     " continuing anyway" %
3108
                     (device.iv_name, node))
3109
        result = False
3110

    
3111
  if instance.disk_template == constants.DT_FILE:
3112
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3113
    if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3114
                                               file_storage_dir):
3115
      logger.Error("could not remove directory '%s'" % file_storage_dir)
3116
      result = False
3117

    
3118
  return result
3119

    
3120

    
3121
def _ComputeDiskSize(disk_template, disk_size, swap_size):
3122
  """Compute disk size requirements in the volume group
3123

3124
  This is currently hard-coded for the two-drive layout.
3125

3126
  """
3127
  # Required free disk space as a function of disk and swap space
3128
  req_size_dict = {
3129
    constants.DT_DISKLESS: None,
3130
    constants.DT_PLAIN: disk_size + swap_size,
3131
    # 256 MB are added for drbd metadata, 128MB for each drbd device
3132
    constants.DT_DRBD8: disk_size + swap_size + 256,
3133
    constants.DT_FILE: None,
3134
  }
3135

    
3136
  if disk_template not in req_size_dict:
3137
    raise errors.ProgrammerError("Disk template '%s' size requirement"
3138
                                 " is unknown" %  disk_template)
3139

    
3140
  return req_size_dict[disk_template]
3141

    
3142

    
3143
def _CheckHVParams(lu, nodenames, hvname, hvparams):
3144
  """Hypervisor parameter validation.
3145

3146
  This function abstract the hypervisor parameter validation to be
3147
  used in both instance create and instance modify.
3148

3149
  @type lu: L{LogicalUnit}
3150
  @param lu: the logical unit for which we check
3151
  @type nodenames: list
3152
  @param nodenames: the list of nodes on which we should check
3153
  @type hvname: string
3154
  @param hvname: the name of the hypervisor we should use
3155
  @type hvparams: dict
3156
  @param hvparams: the parameters which we need to check
3157
  @raise errors.OpPrereqError: if the parameters are not valid
3158

3159
  """
3160
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3161
                                                  hvname,
3162
                                                  hvparams)
3163
  for node in nodenames:
3164
    info = hvinfo.get(node, None)
3165
    if not info or not isinstance(info, (tuple, list)):
3166
      raise errors.OpPrereqError("Cannot get current information"
3167
                                 " from node '%s' (%s)" % (node, info))
3168
    if not info[0]:
3169
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3170
                                 " %s" % info[1])
3171

    
3172

    
3173
class LUCreateInstance(LogicalUnit):
3174
  """Create an instance.
3175

3176
  """
3177
  HPATH = "instance-add"
3178
  HTYPE = constants.HTYPE_INSTANCE
3179
  _OP_REQP = ["instance_name", "mem_size", "disk_size",
3180
              "disk_template", "swap_size", "mode", "start", "vcpus",
3181
              "wait_for_sync", "ip_check", "mac", "hvparams"]
3182
  REQ_BGL = False
3183

    
3184
  def _ExpandNode(self, node):
3185
    """Expands and checks one node name.
3186

3187
    """
3188
    node_full = self.cfg.ExpandNodeName(node)
3189
    if node_full is None:
3190
      raise errors.OpPrereqError("Unknown node %s" % node)
3191
    return node_full
3192

    
3193
  def ExpandNames(self):
3194
    """ExpandNames for CreateInstance.
3195

3196
    Figure out the right locks for instance creation.
3197

3198
    """
3199
    self.needed_locks = {}
3200

    
3201
    # set optional parameters to none if they don't exist
3202
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3203
      if not hasattr(self.op, attr):
3204
        setattr(self.op, attr, None)
3205

    
3206
    # cheap checks, mostly valid constants given
3207

    
3208
    # verify creation mode
3209
    if self.op.mode not in (constants.INSTANCE_CREATE,
3210
                            constants.INSTANCE_IMPORT):
3211
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3212
                                 self.op.mode)
3213

    
3214
    # disk template and mirror node verification
3215
    if self.op.disk_template not in constants.DISK_TEMPLATES:
3216
      raise errors.OpPrereqError("Invalid disk template name")
3217

    
3218
    if self.op.hypervisor is None:
3219
      self.op.hypervisor = self.cfg.GetHypervisorType()
3220

    
3221
    enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3222
    if self.op.hypervisor not in enabled_hvs:
3223
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3224
                                 " cluster (%s)" % (self.op.hypervisor,
3225
                                  ",".join(enabled_hvs)))
3226

    
3227
    # check hypervisor parameter syntax (locally)
3228

    
3229
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3230
    hv_type.CheckParameterSyntax(self.op.hvparams)
3231

    
3232
    #### instance parameters check
3233

    
3234
    # instance name verification
3235
    hostname1 = utils.HostInfo(self.op.instance_name)
3236
    self.op.instance_name = instance_name = hostname1.name
3237

    
3238
    # this is just a preventive check, but someone might still add this
3239
    # instance in the meantime, and creation will fail at lock-add time
3240
    if instance_name in self.cfg.GetInstanceList():
3241
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3242
                                 instance_name)
3243

    
3244
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3245

    
3246
    # ip validity checks
3247
    ip = getattr(self.op, "ip", None)
3248
    if ip is None or ip.lower() == "none":
3249
      inst_ip = None
3250
    elif ip.lower() == "auto":
3251
      inst_ip = hostname1.ip
3252
    else:
3253
      if not utils.IsValidIP(ip):
3254
        raise errors.OpPrereqError("given IP address '%s' doesn't look"
3255
                                   " like a valid IP" % ip)
3256
      inst_ip = ip
3257
    self.inst_ip = self.op.ip = inst_ip
3258
    # used in CheckPrereq for ip ping check
3259
    self.check_ip = hostname1.ip
3260

    
3261
    # MAC address verification
3262
    if self.op.mac != "auto":
3263
      if not utils.IsValidMac(self.op.mac.lower()):
3264
        raise errors.OpPrereqError("invalid MAC address specified: %s" %
3265
                                   self.op.mac)
3266

    
3267
    # file storage checks
3268
    if (self.op.file_driver and
3269
        not self.op.file_driver in constants.FILE_DRIVER):
3270
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
3271
                                 self.op.file_driver)
3272

    
3273
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3274
      raise errors.OpPrereqError("File storage directory path not absolute")
3275

    
3276
    ### Node/iallocator related checks
3277
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
3278
      raise errors.OpPrereqError("One and only one of iallocator and primary"
3279
                                 " node must be given")
3280

    
3281
    if self.op.iallocator:
3282
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3283
    else:
3284
      self.op.pnode = self._ExpandNode(self.op.pnode)
3285
      nodelist = [self.op.pnode]
3286
      if self.op.snode is not None:
3287
        self.op.snode = self._ExpandNode(self.op.snode)
3288
        nodelist.append(self.op.snode)
3289
      self.needed_locks[locking.LEVEL_NODE] = nodelist
3290

    
3291
    # in case of import lock the source node too
3292
    if self.op.mode == constants.INSTANCE_IMPORT:
3293
      src_node = getattr(self.op, "src_node", None)
3294
      src_path = getattr(self.op, "src_path", None)
3295

    
3296
      if src_node is None or src_path is None:
3297
        raise errors.OpPrereqError("Importing an instance requires source"
3298
                                   " node and path options")
3299

    
3300
      if not os.path.isabs(src_path):
3301
        raise errors.OpPrereqError("The source path must be absolute")
3302

    
3303
      self.op.src_node = src_node = self._ExpandNode(src_node)
3304
      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3305
        self.needed_locks[locking.LEVEL_NODE].append(src_node)
3306

    
3307
    else: # INSTANCE_CREATE
3308
      if getattr(self.op, "os_type", None) is None:
3309
        raise errors.OpPrereqError("No guest OS specified")
3310

    
3311
  def _RunAllocator(self):
3312
    """Run the allocator based on input opcode.
3313

3314
    """
3315
    disks = [{"size": self.op.disk_size, "mode": "w"},
3316
             {"size": self.op.swap_size, "mode": "w"}]
3317
    nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3318
             "bridge": self.op.bridge}]
3319
    ial = IAllocator(self,
3320
                     mode=constants.IALLOCATOR_MODE_ALLOC,
3321
                     name=self.op.instance_name,
3322
                     disk_template=self.op.disk_template,
3323
                     tags=[],
3324
                     os=self.op.os_type,
3325
                     vcpus=self.op.vcpus,
3326
                     mem_size=self.op.mem_size,
3327
                     disks=disks,
3328
                     nics=nics,
3329
                     )
3330

    
3331
    ial.Run(self.op.iallocator)
3332

    
3333
    if not ial.success:
3334
      raise errors.OpPrereqError("Can't compute nodes using"
3335
                                 " iallocator '%s': %s" % (self.op.iallocator,
3336
                                                           ial.info))
3337
    if len(ial.nodes) != ial.required_nodes:
3338
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3339
                                 " of nodes (%s), required %s" %
3340
                                 (self.op.iallocator, len(ial.nodes),
3341
                                  ial.required_nodes))
3342
    self.op.pnode = ial.nodes[0]
3343
    logger.ToStdout("Selected nodes for the instance: %s" %
3344
                    (", ".join(ial.nodes),))
3345
    logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3346
                (self.op.instance_name, self.op.iallocator, ial.nodes))
3347
    if ial.required_nodes == 2:
3348
      self.op.snode = ial.nodes[1]
3349

    
3350
  def BuildHooksEnv(self):
3351
    """Build hooks env.
3352

3353
    This runs on master, primary and secondary nodes of the instance.
3354

3355
    """
3356
    env = {
3357
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3358
      "INSTANCE_DISK_SIZE": self.op.disk_size,
3359
      "INSTANCE_SWAP_SIZE": self.op.swap_size,
3360
      "INSTANCE_ADD_MODE": self.op.mode,
3361
      }
3362
    if self.op.mode == constants.INSTANCE_IMPORT:
3363
      env["INSTANCE_SRC_NODE"] = self.op.src_node
3364
      env["INSTANCE_SRC_PATH"] = self.op.src_path
3365
      env["INSTANCE_SRC_IMAGE"] = self.src_image
3366

    
3367
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3368
      primary_node=self.op.pnode,
3369
      secondary_nodes=self.secondaries,
3370
      status=self.instance_status,
3371
      os_type=self.op.os_type,
3372
      memory=self.op.mem_size,
3373
      vcpus=self.op.vcpus,
3374
      nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3375
    ))
3376

    
3377
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3378
          self.secondaries)
3379
    return env, nl, nl
3380

    
3381

    
3382
  def CheckPrereq(self):
3383
    """Check prerequisites.
3384

3385
    """
3386
    if (not self.cfg.GetVGName() and
3387
        self.op.disk_template not in constants.DTS_NOT_LVM):
3388
      raise errors.OpPrereqError("Cluster does not support lvm-based"
3389
                                 " instances")
3390

    
3391

    
3392
    if self.op.mode == constants.INSTANCE_IMPORT:
3393
      src_node = self.op.src_node
3394
      src_path = self.op.src_path
3395

    
3396
      export_info = self.rpc.call_export_info(src_node, src_path)
3397

    
3398
      if not export_info:
3399
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
3400

    
3401
      if not export_info.has_section(constants.INISECT_EXP):
3402
        raise errors.ProgrammerError("Corrupted export config")
3403

    
3404
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
3405
      if (int(ei_version) != constants.EXPORT_VERSION):
3406
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3407
                                   (ei_version, constants.EXPORT_VERSION))
3408

    
3409
      if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3410
        raise errors.OpPrereqError("Can't import instance with more than"
3411
                                   " one data disk")
3412

    
3413
      # FIXME: are the old os-es, disk sizes, etc. useful?
3414
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3415
      diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3416
                                                         'disk0_dump'))
3417
      self.src_image = diskimage
3418

    
3419
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
3420

    
3421
    if self.op.start and not self.op.ip_check:
3422
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3423
                                 " adding an instance in start mode")
3424

    
3425
    if self.op.ip_check:
3426
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3427
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3428
                                   (self.check_ip, self.op.instance_name))
3429

    
3430
    # bridge verification
3431
    bridge = getattr(self.op, "bridge", None)
3432
    if bridge is None:
3433
      self.op.bridge = self.cfg.GetDefBridge()
3434
    else:
3435
      self.op.bridge = bridge
3436

    
3437
    #### allocator run
3438

    
3439
    if self.op.iallocator is not None:
3440
      self._RunAllocator()
3441

    
3442
    #### node related checks
3443

    
3444
    # check primary node
3445
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3446
    assert self.pnode is not None, \
3447
      "Cannot retrieve locked node %s" % self.op.pnode
3448
    self.secondaries = []
3449

    
3450
    # mirror node verification
3451
    if self.op.disk_template in constants.DTS_NET_MIRROR:
3452
      if self.op.snode is None:
3453
        raise errors.OpPrereqError("The networked disk templates need"
3454
                                   " a mirror node")
3455
      if self.op.snode == pnode.name:
3456
        raise errors.OpPrereqError("The secondary node cannot be"
3457
                                   " the primary node.")
3458
      self.secondaries.append(self.op.snode)
3459

    
3460
    nodenames = [pnode.name] + self.secondaries
3461

    
3462
    req_size = _ComputeDiskSize(self.op.disk_template,
3463
                                self.op.disk_size, self.op.swap_size)
3464

    
3465
    # Check lv size requirements
3466
    if req_size is not None:
3467
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3468
                                         self.op.hypervisor)
3469
      for node in nodenames:
3470
        info = nodeinfo.get(node, None)
3471
        if not info:
3472
          raise errors.OpPrereqError("Cannot get current information"
3473
                                     " from node '%s'" % node)
3474
        vg_free = info.get('vg_free', None)
3475
        if not isinstance(vg_free, int):
3476
          raise errors.OpPrereqError("Can't compute free disk space on"
3477
                                     " node %s" % node)
3478
        if req_size > info['vg_free']:
3479
          raise errors.OpPrereqError("Not enough disk space on target node %s."
3480
                                     " %d MB available, %d MB required" %
3481
                                     (node, info['vg_free'], req_size))
3482

    
3483
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3484

    
3485
    # os verification
3486
    os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3487
    if not os_obj:
3488
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
3489
                                 " primary node"  % self.op.os_type)
3490

    
3491
    # bridge check on primary node
3492
    if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3493
      raise errors.OpPrereqError("target bridge '%s' does not exist on"
3494
                                 " destination node '%s'" %
3495
                                 (self.op.bridge, pnode.name))
3496

    
3497
    # memory check on primary node
3498
    if self.op.start:
3499
      _CheckNodeFreeMemory(self, self.pnode.name,
3500
                           "creating instance %s" % self.op.instance_name,
3501
                           self.op.mem_size, self.op.hypervisor)
3502

    
3503
    if self.op.start:
3504
      self.instance_status = 'up'
3505
    else:
3506
      self.instance_status = 'down'
3507

    
3508
  def Exec(self, feedback_fn):
3509
    """Create and add the instance to the cluster.
3510

3511
    """
3512
    instance = self.op.instance_name
3513
    pnode_name = self.pnode.name
3514

    
3515
    if self.op.mac == "auto":
3516
      mac_address = self.cfg.GenerateMAC()
3517
    else:
3518
      mac_address = self.op.mac
3519

    
3520
    nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3521
    if self.inst_ip is not None:
3522
      nic.ip = self.inst_ip
3523

    
3524
    ht_kind = self.op.hypervisor
3525
    if ht_kind in constants.HTS_REQ_PORT:
3526
      network_port = self.cfg.AllocatePort()
3527
    else:
3528
      network_port = None
3529

    
3530
    ##if self.op.vnc_bind_address is None:
3531
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3532

    
3533
    # this is needed because os.path.join does not accept None arguments
3534
    if self.op.file_storage_dir is None:
3535
      string_file_storage_dir = ""
3536
    else:
3537
      string_file_storage_dir = self.op.file_storage_dir
3538

    
3539
    # build the full file storage dir path
3540
    file_storage_dir = os.path.normpath(os.path.join(
3541
                                        self.cfg.GetFileStorageDir(),
3542
                                        string_file_storage_dir, instance))
3543

    
3544

    
3545
    disks = _GenerateDiskTemplate(self,
3546
                                  self.op.disk_template,
3547
                                  instance, pnode_name,
3548
                                  self.secondaries, self.op.disk_size,
3549
                                  self.op.swap_size,
3550
                                  file_storage_dir,
3551
                                  self.op.file_driver)
3552

    
3553
    iobj = objects.Instance(name=instance, os=self.op.os_type,
3554
                            primary_node=pnode_name,
3555
                            memory=self.op.mem_size,
3556
                            vcpus=self.op.vcpus,
3557
                            nics=[nic], disks=disks,
3558
                            disk_template=self.op.disk_template,
3559
                            status=self.instance_status,
3560
                            network_port=network_port,
3561
                            hvparams=self.op.hvparams,
3562
                            hypervisor=self.op.hypervisor,
3563
                            )
3564

    
3565
    feedback_fn("* creating instance disks...")
3566
    if not _CreateDisks(self, iobj):
3567
      _RemoveDisks(self, iobj)
3568
      self.cfg.ReleaseDRBDMinors(instance)
3569
      raise errors.OpExecError("Device creation failed, reverting...")
3570

    
3571
    feedback_fn("adding instance %s to cluster config" % instance)
3572

    
3573
    self.cfg.AddInstance(iobj)
3574
    # Declare that we don't want to remove the instance lock anymore, as we've
3575
    # added the instance to the config
3576
    del self.remove_locks[locking.LEVEL_INSTANCE]
3577
    # Remove the temp. assignements for the instance's drbds
3578
    self.cfg.ReleaseDRBDMinors(instance)
3579

    
3580
    if self.op.wait_for_sync:
3581
      disk_abort = not _WaitForSync(self, iobj)
3582
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
3583
      # make sure the disks are not degraded (still sync-ing is ok)
3584
      time.sleep(15)
3585
      feedback_fn("* checking mirrors status")
3586
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3587
    else:
3588
      disk_abort = False
3589

    
3590
    if disk_abort:
3591
      _RemoveDisks(self, iobj)
3592
      self.cfg.RemoveInstance(iobj.name)
3593
      # Make sure the instance lock gets removed
3594
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3595
      raise errors.OpExecError("There are some degraded disks for"
3596
                               " this instance")
3597

    
3598
    feedback_fn("creating os for instance %s on node %s" %
3599
                (instance, pnode_name))
3600

    
3601
    if iobj.disk_template != constants.DT_DISKLESS:
3602
      if self.op.mode == constants.INSTANCE_CREATE:
3603
        feedback_fn("* running the instance OS create scripts...")
3604
        if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3605
          raise errors.OpExecError("could not add os for instance %s"
3606
                                   " on node %s" %
3607
                                   (instance, pnode_name))
3608

    
3609
      elif self.op.mode == constants.INSTANCE_IMPORT:
3610
        feedback_fn("* running the instance OS import scripts...")
3611
        src_node = self.op.src_node
3612
        src_image = self.src_image
3613
        cluster_name = self.cfg.GetClusterName()
3614
        if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3615
                                                src_node, src_image,
3616
                                                cluster_name):
3617
          raise errors.OpExecError("Could not import os for instance"
3618
                                   " %s on node %s" %
3619
                                   (instance, pnode_name))
3620
      else:
3621
        # also checked in the prereq part
3622
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3623
                                     % self.op.mode)
3624

    
3625
    if self.op.start:
3626
      logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3627
      feedback_fn("* starting instance...")
3628
      if not self.rpc.call_instance_start(pnode_name, iobj, None):
3629
        raise errors.OpExecError("Could not start instance")
3630

    
3631

    
3632
class LUConnectConsole(NoHooksLU):
3633
  """Connect to an instance's console.
3634

3635
  This is somewhat special in that it returns the command line that
3636
  you need to run on the master node in order to connect to the
3637
  console.
3638

3639
  """
3640
  _OP_REQP = ["instance_name"]
3641
  REQ_BGL = False
3642

    
3643
  def ExpandNames(self):
3644
    self._ExpandAndLockInstance()
3645

    
3646
  def CheckPrereq(self):
3647
    """Check prerequisites.
3648

3649
    This checks that the instance is in the cluster.
3650

3651
    """
3652
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3653
    assert self.instance is not None, \
3654
      "Cannot retrieve locked instance %s" % self.op.instance_name
3655

    
3656
  def Exec(self, feedback_fn):
3657
    """Connect to the console of an instance
3658

3659
    """
3660
    instance = self.instance
3661
    node = instance.primary_node
3662

    
3663
    node_insts = self.rpc.call_instance_list([node],
3664
                                             [instance.hypervisor])[node]
3665
    if node_insts is False:
3666
      raise errors.OpExecError("Can't connect to node %s." % node)
3667

    
3668
    if instance.name not in node_insts:
3669
      raise errors.OpExecError("Instance %s is not running." % instance.name)
3670

    
3671
    logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3672

    
3673
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
3674
    console_cmd = hyper.GetShellCommandForConsole(instance)
3675

    
3676
    # build ssh cmdline
3677
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3678

    
3679

    
3680
class LUReplaceDisks(LogicalUnit):
3681
  """Replace the disks of an instance.
3682

3683
  """
3684
  HPATH = "mirrors-replace"
3685
  HTYPE = constants.HTYPE_INSTANCE
3686
  _OP_REQP = ["instance_name", "mode", "disks"]
3687
  REQ_BGL = False
3688

    
3689
  def ExpandNames(self):
3690
    self._ExpandAndLockInstance()
3691

    
3692
    if not hasattr(self.op, "remote_node"):
3693
      self.op.remote_node = None
3694

    
3695
    ia_name = getattr(self.op, "iallocator", None)
3696
    if ia_name is not None:
3697
      if self.op.remote_node is not None:
3698
        raise errors.OpPrereqError("Give either the iallocator or the new"
3699
                                   " secondary, not both")
3700
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3701
    elif self.op.remote_node is not None:
3702
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3703
      if remote_node is None:
3704
        raise errors.OpPrereqError("Node '%s' not known" %
3705
                                   self.op.remote_node)
3706
      self.op.remote_node = remote_node
3707
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3708
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3709
    else:
3710
      self.needed_locks[locking.LEVEL_NODE] = []
3711
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3712

    
3713
  def DeclareLocks(self, level):
3714
    # If we're not already locking all nodes in the set we have to declare the
3715
    # instance's primary/secondary nodes.
3716
    if (level == locking.LEVEL_NODE and
3717
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3718
      self._LockInstancesNodes()
3719

    
3720
  def _RunAllocator(self):
3721
    """Compute a new secondary node using an IAllocator.
3722

3723
    """
3724
    ial = IAllocator(self,
3725
                     mode=constants.IALLOCATOR_MODE_RELOC,
3726
                     name=self.op.instance_name,
3727
                     relocate_from=[self.sec_node])
3728

    
3729
    ial.Run(self.op.iallocator)
3730

    
3731
    if not ial.success:
3732
      raise errors.OpPrereqError("Can't compute nodes using"
3733
                                 " iallocator '%s': %s" % (self.op.iallocator,
3734
                                                           ial.info))
3735
    if len(ial.nodes) != ial.required_nodes:
3736
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3737
                                 " of nodes (%s), required %s" %
3738
                                 (len(ial.nodes), ial.required_nodes))
3739
    self.op.remote_node = ial.nodes[0]
3740
    logger.ToStdout("Selected new secondary for the instance: %s" %
3741
                    self.op.remote_node)
3742

    
3743
  def BuildHooksEnv(self):
3744
    """Build hooks env.
3745

3746
    This runs on the master, the primary and all the secondaries.
3747

3748
    """
3749
    env = {
3750
      "MODE": self.op.mode,
3751
      "NEW_SECONDARY": self.op.remote_node,
3752
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
3753
      }
3754
    env.update(_BuildInstanceHookEnvByObject(self.instance))
3755
    nl = [
3756
      self.cfg.GetMasterNode(),
3757
      self.instance.primary_node,
3758
      ]
3759
    if self.op.remote_node is not None:
3760
      nl.append(self.op.remote_node)
3761
    return env, nl, nl
3762

    
3763
  def CheckPrereq(self):
3764
    """Check prerequisites.
3765

3766
    This checks that the instance is in the cluster.
3767

3768
    """
3769
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3770
    assert instance is not None, \
3771
      "Cannot retrieve locked instance %s" % self.op.instance_name
3772
    self.instance = instance
3773

    
3774
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3775
      raise errors.OpPrereqError("Instance's disk layout is not"
3776
                                 " network mirrored.")
3777

    
3778
    if len(instance.secondary_nodes) != 1:
3779
      raise errors.OpPrereqError("The instance has a strange layout,"
3780
                                 " expected one secondary but found %d" %
3781
                                 len(instance.secondary_nodes))
3782

    
3783
    self.sec_node = instance.secondary_nodes[0]
3784

    
3785
    ia_name = getattr(self.op, "iallocator", None)
3786
    if ia_name is not None:
3787
      self._RunAllocator()
3788

    
3789
    remote_node = self.op.remote_node
3790
    if remote_node is not None:
3791
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3792
      assert self.remote_node_info is not None, \
3793
        "Cannot retrieve locked node %s" % remote_node
3794
    else:
3795
      self.remote_node_info = None
3796
    if remote_node == instance.primary_node:
3797
      raise errors.OpPrereqError("The specified node is the primary node of"
3798
                                 " the instance.")
3799
    elif remote_node == self.sec_node:
3800
      if self.op.mode == constants.REPLACE_DISK_SEC:
3801
        # this is for DRBD8, where we can't execute the same mode of
3802
        # replacement as for drbd7 (no different port allocated)
3803
        raise errors.OpPrereqError("Same secondary given, cannot execute"
3804
                                   " replacement")
3805
    if instance.disk_template == constants.DT_DRBD8:
3806
      if (self.op.mode == constants.REPLACE_DISK_ALL and
3807
          remote_node is not None):
3808
        # switch to replace secondary mode
3809
        self.op.mode = constants.REPLACE_DISK_SEC
3810

    
3811
      if self.op.mode == constants.REPLACE_DISK_ALL:
3812
        raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3813
                                   " secondary disk replacement, not"
3814
                                   " both at once")
3815
      elif self.op.mode == constants.REPLACE_DISK_PRI:
3816
        if remote_node is not None:
3817
          raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3818
                                     " the secondary while doing a primary"
3819
                                     " node disk replacement")
3820
        self.tgt_node = instance.primary_node
3821
        self.oth_node = instance.secondary_nodes[0]
3822
      elif self.op.mode == constants.REPLACE_DISK_SEC:
3823
        self.new_node = remote_node # this can be None, in which case
3824
                                    # we don't change the secondary
3825
        self.tgt_node = instance.secondary_nodes[0]
3826
        self.oth_node = instance.primary_node
3827
      else:
3828
        raise errors.ProgrammerError("Unhandled disk replace mode")
3829

    
3830
    for name in self.op.disks:
3831
      if instance.FindDisk(name) is None:
3832
        raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3833
                                   (name, instance.name))
3834

    
3835
  def _ExecD8DiskOnly(self, feedback_fn):
3836
    """Replace a disk on the primary or secondary for dbrd8.
3837

3838
    The algorithm for replace is quite complicated:
3839
      - for each disk to be replaced:
3840
        - create new LVs on the target node with unique names
3841
        - detach old LVs from the drbd device
3842
        - rename old LVs to name_replaced.<time_t>
3843
        - rename new LVs to old LVs
3844
        - attach the new LVs (with the old names now) to the drbd device
3845
      - wait for sync across all devices
3846
      - for each modified disk:
3847
        - remove old LVs (which have the name name_replaces.<time_t>)
3848

3849
    Failures are not very well handled.
3850

3851
    """
3852
    steps_total = 6
3853
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3854
    instance = self.instance
3855
    iv_names = {}
3856
    vgname = self.cfg.GetVGName()
3857
    # start of work
3858
    cfg = self.cfg
3859
    tgt_node = self.tgt_node
3860
    oth_node = self.oth_node
3861

    
3862
    # Step: check device activation
3863
    self.proc.LogStep(1, steps_total, "check device existence")
3864
    info("checking volume groups")
3865
    my_vg = cfg.GetVGName()
3866
    results = self.rpc.call_vg_list([oth_node, tgt_node])
3867
    if not results:
3868
      raise errors.OpExecError("Can't list volume groups on the nodes")
3869
    for node in oth_node, tgt_node:
3870
      res = results.get(node, False)
3871
      if not res or my_vg not in res:
3872
        raise errors.OpExecError("Volume group '%s' not found on %s" %
3873
                                 (my_vg, node))
3874
    for dev in instance.disks:
3875
      if not dev.iv_name in self.op.disks:
3876
        continue
3877
      for node in tgt_node, oth_node:
3878
        info("checking %s on %s" % (dev.iv_name, node))
3879
        cfg.SetDiskID(dev, node)
3880
        if not self.rpc.call_blockdev_find(node, dev):
3881
          raise errors.OpExecError("Can't find device %s on node %s" %
3882
                                   (dev.iv_name, node))
3883

    
3884
    # Step: check other node consistency
3885
    self.proc.LogStep(2, steps_total, "check peer consistency")
3886
    for dev in instance.disks:
3887
      if not dev.iv_name in self.op.disks:
3888
        continue
3889
      info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3890
      if not _CheckDiskConsistency(self, dev, oth_node,
3891
                                   oth_node==instance.primary_node):
3892
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3893
                                 " to replace disks on this node (%s)" %
3894
                                 (oth_node, tgt_node))
3895

    
3896
    # Step: create new storage
3897
    self.proc.LogStep(3, steps_total, "allocate new storage")
3898
    for dev in instance.disks:
3899
      if not dev.iv_name in self.op.disks:
3900
        continue
3901
      size = dev.size
3902
      cfg.SetDiskID(dev, tgt_node)
3903
      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3904
      names = _GenerateUniqueNames(self, lv_names)
3905
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3906
                             logical_id=(vgname, names[0]))
3907
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3908
                             logical_id=(vgname, names[1]))
3909
      new_lvs = [lv_data, lv_meta]
3910
      old_lvs = dev.children
3911
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3912
      info("creating new local storage on %s for %s" %
3913
           (tgt_node, dev.iv_name))
3914
      # since we *always* want to create this LV, we use the
3915
      # _Create...OnPrimary (which forces the creation), even if we
3916
      # are talking about the secondary node
3917
      for new_lv in new_lvs:
3918
        if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3919
                                        _GetInstanceInfoText(instance)):
3920
          raise errors.OpExecError("Failed to create new LV named '%s' on"
3921
                                   " node '%s'" %
3922
                                   (new_lv.logical_id[1], tgt_node))
3923

    
3924
    # Step: for each lv, detach+rename*2+attach
3925
    self.proc.LogStep(4, steps_total, "change drbd configuration")
3926
    for dev, old_lvs, new_lvs in iv_names.itervalues():
3927
      info("detaching %s drbd from local storage" % dev.iv_name)
3928
      if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3929
        raise errors.OpExecError("Can't detach drbd from local storage on node"
3930
                                 " %s for device %s" % (tgt_node, dev.iv_name))
3931
      #dev.children = []
3932
      #cfg.Update(instance)
3933

    
3934
      # ok, we created the new LVs, so now we know we have the needed
3935
      # storage; as such, we proceed on the target node to rename
3936
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3937
      # using the assumption that logical_id == physical_id (which in
3938
      # turn is the unique_id on that node)
3939

    
3940
      # FIXME(iustin): use a better name for the replaced LVs
3941
      temp_suffix = int(time.time())
3942
      ren_fn = lambda d, suff: (d.physical_id[0],
3943
                                d.physical_id[1] + "_replaced-%s" % suff)
3944
      # build the rename list based on what LVs exist on the node
3945
      rlist = []
3946
      for to_ren in old_lvs:
3947
        find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3948
        if find_res is not None: # device exists
3949
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3950

    
3951
      info("renaming the old LVs on the target node")
3952
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3953
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3954
      # now we rename the new LVs to the old LVs
3955
      info("renaming the new LVs on the target node")
3956
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3957
      if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3958
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3959

    
3960
      for old, new in zip(old_lvs, new_lvs):
3961
        new.logical_id = old.logical_id
3962
        cfg.SetDiskID(new, tgt_node)
3963

    
3964
      for disk in old_lvs:
3965
        disk.logical_id = ren_fn(disk, temp_suffix)
3966
        cfg.SetDiskID(disk, tgt_node)
3967

    
3968
      # now that the new lvs have the old name, we can add them to the device
3969
      info("adding new mirror component on %s" % tgt_node)
3970
      if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3971
        for new_lv in new_lvs:
3972
          if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
3973
            warning("Can't rollback device %s", hint="manually cleanup unused"
3974
                    " logical volumes")
3975
        raise errors.OpExecError("Can't add local storage to drbd")
3976

    
3977
      dev.children = new_lvs
3978
      cfg.Update(instance)
3979

    
3980
    # Step: wait for sync
3981

    
3982
    # this can fail as the old devices are degraded and _WaitForSync
3983
    # does a combined result over all disks, so we don't check its
3984
    # return value
3985
    self.proc.LogStep(5, steps_total, "sync devices")
3986
    _WaitForSync(self, instance, unlock=True)
3987

    
3988
    # so check manually all the devices
3989
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3990
      cfg.SetDiskID(dev, instance.primary_node)
3991
      is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
3992
      if is_degr:
3993
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
3994

    
3995
    # Step: remove old storage
3996
    self.proc.LogStep(6, steps_total, "removing old storage")
3997
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3998
      info("remove logical volumes for %s" % name)
3999
      for lv in old_lvs:
4000
        cfg.SetDiskID(lv, tgt_node)
4001
        if not self.rpc.call_blockdev_remove(tgt_node, lv):
4002
          warning("Can't remove old LV", hint="manually remove unused LVs")
4003
          continue
4004

    
4005
  def _ExecD8Secondary(self, feedback_fn):
4006
    """Replace the secondary node for drbd8.
4007

4008
    The algorithm for replace is quite complicated:
4009
      - for all disks of the instance:
4010
        - create new LVs on the new node with same names
4011
        - shutdown the drbd device on the old secondary
4012
        - disconnect the drbd network on the primary
4013
        - create the drbd device on the new secondary
4014
        - network attach the drbd on the primary, using an artifice:
4015
          the drbd code for Attach() will connect to the network if it
4016
          finds a device which is connected to the good local disks but
4017
          not network enabled
4018
      - wait for sync across all devices
4019
      - remove all disks from the old secondary
4020

4021
    Failures are not very well handled.
4022

4023
    """
4024
    steps_total = 6
4025
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4026
    instance = self.instance
4027
    iv_names = {}
4028
    vgname = self.cfg.GetVGName()
4029
    # start of work
4030
    cfg = self.cfg
4031
    old_node = self.tgt_node
4032
    new_node = self.new_node
4033
    pri_node = instance.primary_node
4034

    
4035
    # Step: check device activation
4036
    self.proc.LogStep(1, steps_total, "check device existence")
4037
    info("checking volume groups")
4038
    my_vg = cfg.GetVGName()
4039
    results = self.rpc.call_vg_list([pri_node, new_node])
4040
    if not results:
4041
      raise errors.OpExecError("Can't list volume groups on the nodes")
4042
    for node in pri_node, new_node:
4043
      res = results.get(node, False)
4044
      if not res or my_vg not in res:
4045
        raise errors.OpExecError("Volume group '%s' not found on %s" %
4046
                                 (my_vg, node))
4047
    for dev in instance.disks:
4048
      if not dev.iv_name in self.op.disks:
4049
        continue
4050
      info("checking %s on %s" % (dev.iv_name, pri_node))
4051
      cfg.SetDiskID(dev, pri_node)
4052
      if not self.rpc.call_blockdev_find(pri_node, dev):
4053
        raise errors.OpExecError("Can't find device %s on node %s" %
4054
                                 (dev.iv_name, pri_node))
4055

    
4056
    # Step: check other node consistency
4057
    self.proc.LogStep(2, steps_total, "check peer consistency")
4058
    for dev in instance.disks:
4059
      if not dev.iv_name in self.op.disks:
4060
        continue
4061
      info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4062
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4063
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
4064
                                 " unsafe to replace the secondary" %
4065
                                 pri_node)
4066

    
4067
    # Step: create new storage
4068
    self.proc.LogStep(3, steps_total, "allocate new storage")
4069
    for dev in instance.disks:
4070
      size = dev.size
4071
      info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4072
      # since we *always* want to create this LV, we use the
4073
      # _Create...OnPrimary (which forces the creation), even if we
4074
      # are talking about the secondary node
4075
      for new_lv in dev.children:
4076
        if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4077
                                        _GetInstanceInfoText(instance)):
4078
          raise errors.OpExecError("Failed to create new LV named '%s' on"
4079
                                   " node '%s'" %
4080
                                   (new_lv.logical_id[1], new_node))
4081

    
4082

    
4083
    # Step 4: dbrd minors and drbd setups changes
4084
    # after this, we must manually remove the drbd minors on both the
4085
    # error and the success paths
4086
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4087
                                   instance.name)
4088
    logging.debug("Allocated minors %s" % (minors,))
4089
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
4090
    for dev, new_minor in zip(instance.disks, minors):
4091
      size = dev.size
4092
      info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4093
      # create new devices on new_node
4094
      if pri_node == dev.logical_id[0]:
4095
        new_logical_id = (pri_node, new_node,
4096
                          dev.logical_id[2], dev.logical_id[3], new_minor,
4097
                          dev.logical_id[5])
4098
      else:
4099
        new_logical_id = (new_node, pri_node,
4100
                          dev.logical_id[2], new_minor, dev.logical_id[4],
4101
                          dev.logical_id[5])
4102
      iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4103
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4104
                    new_logical_id)
4105
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4106
                              logical_id=new_logical_id,
4107
                              children=dev.children)
4108
      if not _CreateBlockDevOnSecondary(self, new_node, instance,
4109
                                        new_drbd, False,
4110
                                        _GetInstanceInfoText(instance)):
4111
        self.cfg.ReleaseDRBDMinors(instance.name)
4112
        raise errors.OpExecError("Failed to create new DRBD on"
4113
                                 " node '%s'" % new_node)
4114

    
4115
    for dev in instance.disks:
4116
      # we have new devices, shutdown the drbd on the old secondary
4117
      info("shutting down drbd for %s on old node" % dev.iv_name)
4118
      cfg.SetDiskID(dev, old_node)
4119
      if not self.rpc.call_blockdev_shutdown(old_node, dev):
4120
        warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4121
                hint="Please cleanup this device manually as soon as possible")
4122

    
4123
    info("detaching primary drbds from the network (=> standalone)")
4124
    done = 0
4125
    for dev in instance.disks:
4126
      cfg.SetDiskID(dev, pri_node)
4127
      # set the network part of the physical (unique in bdev terms) id
4128
      # to None, meaning detach from network
4129
      dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4130
      # and 'find' the device, which will 'fix' it to match the
4131
      # standalone state
4132
      if self.rpc.call_blockdev_find(pri_node, dev):
4133
        done += 1
4134
      else:
4135
        warning("Failed to detach drbd %s from network, unusual case" %
4136
                dev.iv_name)
4137

    
4138
    if not done:
4139
      # no detaches succeeded (very unlikely)
4140
      self.cfg.ReleaseDRBDMinors(instance.name)
4141
      raise errors.OpExecError("Can't detach at least one DRBD from old node")
4142

    
4143
    # if we managed to detach at least one, we update all the disks of
4144
    # the instance to point to the new secondary
4145
    info("updating instance configuration")
4146
    for dev, _, new_logical_id in iv_names.itervalues():
4147
      dev.logical_id = new_logical_id
4148
      cfg.SetDiskID(dev, pri_node)
4149
    cfg.Update(instance)
4150
    # we can remove now the temp minors as now the new values are
4151
    # written to the config file (and therefore stable)
4152
    self.cfg.ReleaseDRBDMinors(instance.name)
4153

    
4154
    # and now perform the drbd attach
4155
    info("attaching primary drbds to new secondary (standalone => connected)")
4156
    failures = []
4157
    for dev in instance.disks:
4158
      info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4159
      # since the attach is smart, it's enough to 'find' the device,
4160
      # it will automatically activate the network, if the physical_id
4161
      # is correct
4162
      cfg.SetDiskID(dev, pri_node)
4163
      logging.debug("Disk to attach: %s", dev)
4164
      if not self.rpc.call_blockdev_find(pri_node, dev):
4165
        warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4166
                "please do a gnt-instance info to see the status of disks")
4167

    
4168
    # this can fail as the old devices are degraded and _WaitForSync
4169
    # does a combined result over all disks, so we don't check its
4170
    # return value
4171
    self.proc.LogStep(5, steps_total, "sync devices")
4172
    _WaitForSync(self, instance, unlock=True)
4173

    
4174
    # so check manually all the devices
4175
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4176
      cfg.SetDiskID(dev, pri_node)
4177
      is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4178
      if is_degr:
4179
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
4180

    
4181
    self.proc.LogStep(6, steps_total, "removing old storage")
4182
    for name, (dev, old_lvs, _) in iv_names.iteritems():
4183
      info("remove logical volumes for %s" % name)
4184
      for lv in old_lvs:
4185
        cfg.SetDiskID(lv, old_node)
4186
        if not self.rpc.call_blockdev_remove(old_node, lv):
4187
          warning("Can't remove LV on old secondary",
4188
                  hint="Cleanup stale volumes by hand")
4189

    
4190
  def Exec(self, feedback_fn):
4191
    """Execute disk replacement.
4192

4193
    This dispatches the disk replacement to the appropriate handler.
4194

4195
    """
4196
    instance = self.instance
4197

    
4198
    # Activate the instance disks if we're replacing them on a down instance
4199
    if instance.status == "down":
4200
      _StartInstanceDisks(self, instance, True)
4201

    
4202
    if instance.disk_template == constants.DT_DRBD8:
4203
      if self.op.remote_node is None:
4204
        fn = self._ExecD8DiskOnly
4205
      else:
4206
        fn = self._ExecD8Secondary
4207
    else:
4208
      raise errors.ProgrammerError("Unhandled disk replacement case")
4209

    
4210
    ret = fn(feedback_fn)
4211

    
4212
    # Deactivate the instance disks if we're replacing them on a down instance
4213
    if instance.status == "down":
4214
      _SafeShutdownInstanceDisks(self, instance)
4215

    
4216
    return ret
4217

    
4218

    
4219
class LUGrowDisk(LogicalUnit):
4220
  """Grow a disk of an instance.
4221

4222
  """
4223
  HPATH = "disk-grow"
4224
  HTYPE = constants.HTYPE_INSTANCE
4225
  _OP_REQP = ["instance_name", "disk", "amount"]
4226
  REQ_BGL = False
4227

    
4228
  def ExpandNames(self):
4229
    self._ExpandAndLockInstance()
4230
    self.needed_locks[locking.LEVEL_NODE] = []
4231
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4232

    
4233
  def DeclareLocks(self, level):
4234
    if level == locking.LEVEL_NODE:
4235
      self._LockInstancesNodes()
4236

    
4237
  def BuildHooksEnv(self):
4238
    """Build hooks env.
4239

4240
    This runs on the master, the primary and all the secondaries.
4241

4242
    """
4243
    env = {
4244
      "DISK": self.op.disk,
4245
      "AMOUNT": self.op.amount,
4246
      }
4247
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4248
    nl = [
4249
      self.cfg.GetMasterNode(),
4250
      self.instance.primary_node,
4251
      ]
4252
    return env, nl, nl
4253

    
4254
  def CheckPrereq(self):
4255
    """Check prerequisites.
4256

4257
    This checks that the instance is in the cluster.
4258

4259
    """
4260
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4261
    assert instance is not None, \
4262
      "Cannot retrieve locked instance %s" % self.op.instance_name
4263

    
4264
    self.instance = instance
4265

    
4266
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4267
      raise errors.OpPrereqError("Instance's disk layout does not support"
4268
                                 " growing.")
4269

    
4270
    if instance.FindDisk(self.op.disk) is None:
4271
      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4272
                                 (self.op.disk, instance.name))
4273

    
4274
    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4275
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4276
                                       instance.hypervisor)
4277
    for node in nodenames:
4278
      info = nodeinfo.get(node, None)
4279
      if not info:
4280
        raise errors.OpPrereqError("Cannot get current information"
4281
                                   " from node '%s'" % node)
4282
      vg_free = info.get('vg_free', None)
4283
      if not isinstance(vg_free, int):
4284
        raise errors.OpPrereqError("Can't compute free disk space on"
4285
                                   " node %s" % node)
4286
      if self.op.amount > info['vg_free']:
4287
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
4288
                                   " %d MiB available, %d MiB required" %
4289
                                   (node, info['vg_free'], self.op.amount))
4290

    
4291
  def Exec(self, feedback_fn):
4292
    """Execute disk grow.
4293

4294
    """
4295
    instance = self.instance
4296
    disk = instance.FindDisk(self.op.disk)
4297
    for node in (instance.secondary_nodes + (instance.primary_node,)):
4298
      self.cfg.SetDiskID(disk, node)
4299
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4300
      if (not result or not isinstance(result, (list, tuple)) or
4301
          len(result) != 2):
4302
        raise errors.OpExecError("grow request failed to node %s" % node)
4303
      elif not result[0]:
4304
        raise errors.OpExecError("grow request failed to node %s: %s" %
4305
                                 (node, result[1]))
4306
    disk.RecordGrow(self.op.amount)
4307
    self.cfg.Update(instance)
4308
    return
4309

    
4310

    
4311
class LUQueryInstanceData(NoHooksLU):
4312
  """Query runtime instance data.
4313

4314
  """
4315
  _OP_REQP = ["instances"]
4316
  REQ_BGL = False
4317

    
4318
  def ExpandNames(self):
4319
    self.needed_locks = {}
4320
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4321

    
4322
    if not isinstance(self.op.instances, list):
4323
      raise errors.OpPrereqError("Invalid argument type 'instances'")
4324

    
4325
    if self.op.instances:
4326
      self.wanted_names = []
4327
      for name in self.op.instances:
4328
        full_name = self.cfg.ExpandInstanceName(name)
4329
        if full_name is None:
4330
          raise errors.OpPrereqError("Instance '%s' not known" %
4331
                                     self.op.instance_name)
4332
        self.wanted_names.append(full_name)
4333
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4334
    else:
4335
      self.wanted_names = None
4336
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4337

    
4338
    self.needed_locks[locking.LEVEL_NODE] = []
4339
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4340

    
4341
  def DeclareLocks(self, level):
4342
    if level == locking.LEVEL_NODE:
4343
      self._LockInstancesNodes()
4344

    
4345
  def CheckPrereq(self):
4346
    """Check prerequisites.
4347

4348
    This only checks the optional instance list against the existing names.
4349

4350
    """
4351
    if self.wanted_names is None:
4352
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4353

    
4354
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4355
                             in self.wanted_names]
4356
    return
4357

    
4358
  def _ComputeDiskStatus(self, instance, snode, dev):
4359
    """Compute block device status.
4360

4361
    """
4362
    self.cfg.SetDiskID(dev, instance.primary_node)
4363
    dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4364
    if dev.dev_type in constants.LDS_DRBD:
4365
      # we change the snode then (otherwise we use the one passed in)
4366
      if dev.logical_id[0] == instance.primary_node:
4367
        snode = dev.logical_id[1]
4368
      else:
4369
        snode = dev.logical_id[0]
4370

    
4371
    if snode:
4372
      self.cfg.SetDiskID(dev, snode)
4373
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4374
    else:
4375
      dev_sstatus = None
4376

    
4377
    if dev.children:
4378
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
4379
                      for child in dev.children]
4380
    else:
4381
      dev_children = []
4382

    
4383
    data = {
4384
      "iv_name": dev.iv_name,
4385
      "dev_type": dev.dev_type,
4386
      "logical_id": dev.logical_id,
4387
      "physical_id": dev.physical_id,
4388
      "pstatus": dev_pstatus,
4389
      "sstatus": dev_sstatus,
4390
      "children": dev_children,
4391
      }
4392

    
4393
    return data
4394

    
4395
  def Exec(self, feedback_fn):
4396
    """Gather and return data"""
4397
    result = {}
4398
    for instance in self.wanted_instances:
4399
      remote_info = self.rpc.call_instance_info(instance.primary_node,
4400
                                                instance.name,
4401
                                                instance.hypervisor)
4402
      if remote_info and "state" in remote_info:
4403
        remote_state = "up"
4404
      else:
4405
        remote_state = "down"
4406
      if instance.status == "down":
4407
        config_state = "down"
4408
      else:
4409
        config_state = "up"
4410

    
4411
      disks = [self._ComputeDiskStatus(instance, None, device)
4412
               for device in instance.disks]
4413

    
4414
      idict = {
4415
        "name": instance.name,
4416
        "config_state": config_state,
4417
        "run_state": remote_state,
4418
        "pnode": instance.primary_node,
4419
        "snodes": instance.secondary_nodes,
4420
        "os": instance.os,
4421
        "memory": instance.memory,
4422
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4423
        "disks": disks,
4424
        "vcpus": instance.vcpus,
4425
        "hypervisor": instance.hypervisor,
4426
        }
4427

    
4428
      htkind = instance.hypervisor
4429
      if htkind == constants.HT_XEN_PVM:
4430
        idict["kernel_path"] = instance.kernel_path
4431
        idict["initrd_path"] = instance.initrd_path
4432

    
4433
      if htkind == constants.HT_XEN_HVM:
4434
        idict["hvm_boot_order"] = instance.hvm_boot_order
4435
        idict["hvm_acpi"] = instance.hvm_acpi
4436
        idict["hvm_pae"] = instance.hvm_pae
4437
        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4438
        idict["hvm_nic_type"] = instance.hvm_nic_type
4439
        idict["hvm_disk_type"] = instance.hvm_disk_type
4440

    
4441
      if htkind in constants.HTS_REQ_PORT:
4442
        if instance.vnc_bind_address is None:
4443
          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4444
        else:
4445
          vnc_bind_address = instance.vnc_bind_address
4446
        if instance.network_port is None:
4447
          vnc_console_port = None
4448
        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4449
          vnc_console_port = "%s:%s" % (instance.primary_node,
4450
                                       instance.network_port)
4451
        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4452
          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4453
                                                   instance.network_port,
4454
                                                   instance.primary_node)
4455
        else:
4456
          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4457
                                        instance.network_port)
4458
        idict["vnc_console_port"] = vnc_console_port
4459
        idict["vnc_bind_address"] = vnc_bind_address
4460
        idict["network_port"] = instance.network_port
4461

    
4462
      result[instance.name] = idict
4463

    
4464
    return result
4465

    
4466

    
4467
class LUSetInstanceParams(LogicalUnit):
4468
  """Modifies an instances's parameters.
4469

4470
  """
4471
  HPATH = "instance-modify"
4472
  HTYPE = constants.HTYPE_INSTANCE
4473
  _OP_REQP = ["instance_name", "hvparams"]
4474
  REQ_BGL = False
4475

    
4476
  def ExpandNames(self):
4477
    self._ExpandAndLockInstance()
4478
    self.needed_locks[locking.LEVEL_NODE] = []
4479
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4480

    
4481

    
4482
  def DeclareLocks(self, level):
4483
    if level == locking.LEVEL_NODE:
4484
      self._LockInstancesNodes()
4485

    
4486
  def BuildHooksEnv(self):
4487
    """Build hooks env.
4488

4489
    This runs on the master, primary and secondaries.
4490

4491
    """
4492
    args = dict()
4493
    if self.mem:
4494
      args['memory'] = self.mem
4495
    if self.vcpus:
4496
      args['vcpus'] = self.vcpus
4497
    if self.do_ip or self.do_bridge or self.mac:
4498
      if self.do_ip:
4499
        ip = self.ip
4500
      else:
4501
        ip = self.instance.nics[0].ip
4502
      if self.bridge:
4503
        bridge = self.bridge
4504
      else:
4505
        bridge = self.instance.nics[0].bridge
4506
      if self.mac:
4507
        mac = self.mac
4508
      else:
4509
        mac = self.instance.nics[0].mac
4510
      args['nics'] = [(ip, bridge, mac)]
4511
    env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4512
    nl = [self.cfg.GetMasterNode(),
4513
          self.instance.primary_node] + list(self.instance.secondary_nodes)
4514
    return env, nl, nl
4515

    
4516
  def CheckPrereq(self):
4517
    """Check prerequisites.
4518

4519
    This only checks the instance list against the existing names.
4520

4521
    """
4522
    # FIXME: all the parameters could be checked before, in ExpandNames, or in
4523
    # a separate CheckArguments function, if we implement one, so the operation
4524
    # can be aborted without waiting for any lock, should it have an error...
4525
    self.mem = getattr(self.op, "mem", None)
4526
    self.vcpus = getattr(self.op, "vcpus", None)
4527
    self.ip = getattr(self.op, "ip", None)
4528
    self.mac = getattr(self.op, "mac", None)
4529
    self.bridge = getattr(self.op, "bridge", None)
4530
    self.kernel_path = getattr(self.op, "kernel_path", None)
4531
    self.initrd_path = getattr(self.op, "initrd_path", None)
4532
    self.force = getattr(self.op, "force", None)
4533
    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac]
4534
    if all_parms.count(None) == len(all_parms) and not self.op.hvparams:
4535
      raise errors.OpPrereqError("No changes submitted")
4536
    if self.mem is not None:
4537
      try:
4538
        self.mem = int(self.mem)
4539
      except ValueError, err:
4540
        raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4541
    if self.vcpus is not None:
4542
      try:
4543
        self.vcpus = int(self.vcpus)
4544
      except ValueError, err:
4545
        raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4546
    if self.ip is not None:
4547
      self.do_ip = True
4548
      if self.ip.lower() == "none":
4549
        self.ip = None
4550
      else:
4551
        if not utils.IsValidIP(self.ip):
4552
          raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4553
    else:
4554
      self.do_ip = False
4555
    self.do_bridge = (self.bridge is not None)
4556
    if self.mac is not None:
4557
      if self.cfg.IsMacInUse(self.mac):
4558
        raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4559
                                   self.mac)
4560
      if not utils.IsValidMac(self.mac):
4561
        raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4562

    
4563
    # checking the new params on the primary/secondary nodes
4564

    
4565
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4566
    assert self.instance is not None, \
4567
      "Cannot retrieve locked instance %s" % self.op.instance_name
4568
    pnode = self.instance.primary_node
4569
    nodelist = [pnode]
4570
    nodelist.extend(instance.secondary_nodes)
4571

    
4572
    if self.op.hvparams:
4573
      i_hvdict = copy.deepcopy(instance.hvparams)
4574
      for key, val in self.op.hvparams.iteritems():
4575
        if val is None:
4576
          try:
4577
            del i_hvdict[key]
4578
          except KeyError:
4579
            pass
4580
        else:
4581
          i_hvdict[key] = val
4582
      cluster = self.cfg.GetClusterInfo()
4583
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4584
                                i_hvdict)
4585
      # local check
4586
      hypervisor.GetHypervisor(
4587
        instance.hypervisor).CheckParameterSyntax(hv_new)
4588
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4589
      self.hv_new = hv_new
4590

    
4591
    self.warn = []
4592
    if self.mem is not None and not self.force:
4593
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
4594
                                                  instance.hypervisor)
4595
      nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4596
                                         instance.hypervisor)
4597

    
4598
      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4599
        # Assume the primary node is unreachable and go ahead
4600
        self.warn.append("Can't get info from primary node %s" % pnode)
4601
      else:
4602
        if instance_info:
4603
          current_mem = instance_info['memory']
4604
        else:
4605
          # Assume instance not running
4606
          # (there is a slight race condition here, but it's not very probable,
4607
          # and we have no other way to check)
4608
          current_mem = 0
4609
        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4610
        if miss_mem > 0:
4611
          raise errors.OpPrereqError("This change will prevent the instance"
4612
                                     " from starting, due to %d MB of memory"
4613
                                     " missing on its primary node" % miss_mem)
4614

    
4615
      for node in instance.secondary_nodes:
4616
        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4617
          self.warn.append("Can't get info from secondary node %s" % node)
4618
        elif self.mem > nodeinfo[node]['memory_free']:
4619
          self.warn.append("Not enough memory to failover instance to"
4620
                           " secondary node %s" % node)
4621

    
4622
    return
4623

    
4624
  def Exec(self, feedback_fn):
4625
    """Modifies an instance.
4626

4627
    All parameters take effect only at the next restart of the instance.
4628
    """
4629
    # Process here the warnings from CheckPrereq, as we don't have a
4630
    # feedback_fn there.
4631
    for warn in self.warn:
4632
      feedback_fn("WARNING: %s" % warn)
4633

    
4634
    result = []
4635
    instance = self.instance
4636
    if self.mem:
4637
      instance.memory = self.mem
4638
      result.append(("mem", self.mem))
4639
    if self.vcpus:
4640
      instance.vcpus = self.vcpus
4641
      result.append(("vcpus",  self.vcpus))
4642
    if self.do_ip:
4643
      instance.nics[0].ip = self.ip
4644
      result.append(("ip", self.ip))
4645
    if self.bridge:
4646
      instance.nics[0].bridge = self.bridge
4647
      result.append(("bridge", self.bridge))
4648
    if self.mac:
4649
      instance.nics[0].mac = self.mac
4650
      result.append(("mac", self.mac))
4651
    if self.op.hvparams:
4652
      instance.hvparams = self.hv_new
4653
      for key, val in self.op.hvparams.iteritems():
4654
        result.append(("hv/%s" % key, val))
4655

    
4656
    self.cfg.Update(instance)
4657

    
4658
    return result
4659

    
4660

    
4661
class LUQueryExports(NoHooksLU):
4662
  """Query the exports list
4663

4664
  """
4665
  _OP_REQP = ['nodes']
4666
  REQ_BGL = False
4667

    
4668
  def ExpandNames(self):
4669
    self.needed_locks = {}
4670
    self.share_locks[locking.LEVEL_NODE] = 1
4671
    if not self.op.nodes:
4672
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4673
    else:
4674
      self.needed_locks[locking.LEVEL_NODE] = \
4675
        _GetWantedNodes(self, self.op.nodes)
4676

    
4677
  def CheckPrereq(self):
4678
    """Check prerequisites.
4679

4680
    """
4681
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4682

    
4683
  def Exec(self, feedback_fn):
4684
    """Compute the list of all the exported system images.
4685

4686
    Returns:
4687
      a dictionary with the structure node->(export-list)
4688
      where export-list is a list of the instances exported on
4689
      that node.
4690

4691
    """
4692
    return self.rpc.call_export_list(self.nodes)
4693

    
4694

    
4695
class LUExportInstance(LogicalUnit):
4696
  """Export an instance to an image in the cluster.
4697

4698
  """
4699
  HPATH = "instance-export"
4700
  HTYPE = constants.HTYPE_INSTANCE
4701
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
4702
  REQ_BGL = False
4703

    
4704
  def ExpandNames(self):
4705
    self._ExpandAndLockInstance()
4706
    # FIXME: lock only instance primary and destination node
4707
    #
4708
    # Sad but true, for now we have do lock all nodes, as we don't know where
4709
    # the previous export might be, and and in this LU we search for it and
4710
    # remove it from its current node. In the future we could fix this by:
4711
    #  - making a tasklet to search (share-lock all), then create the new one,
4712
    #    then one to remove, after
4713
    #  - removing the removal operation altoghether
4714
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4715

    
4716
  def DeclareLocks(self, level):
4717
    """Last minute lock declaration."""
4718
    # All nodes are locked anyway, so nothing to do here.
4719

    
4720
  def BuildHooksEnv(self):
4721
    """Build hooks env.
4722

4723
    This will run on the master, primary node and target node.
4724

4725
    """
4726
    env = {
4727
      "EXPORT_NODE": self.op.target_node,
4728
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4729
      }
4730
    env.update(_BuildInstanceHookEnvByObject(self.instance))
4731
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4732
          self.op.target_node]
4733
    return env, nl, nl
4734

    
4735
  def CheckPrereq(self):
4736
    """Check prerequisites.
4737

4738
    This checks that the instance and node names are valid.
4739

4740
    """
4741
    instance_name = self.op.instance_name
4742
    self.instance = self.cfg.GetInstanceInfo(instance_name)
4743
    assert self.instance is not None, \
4744
          "Cannot retrieve locked instance %s" % self.op.instance_name
4745

    
4746
    self.dst_node = self.cfg.GetNodeInfo(
4747
      self.cfg.ExpandNodeName(self.op.target_node))
4748

    
4749
    assert self.dst_node is not None, \
4750
          "Cannot retrieve locked node %s" % self.op.target_node
4751

    
4752
    # instance disk type verification
4753
    for disk in self.instance.disks:
4754
      if disk.dev_type == constants.LD_FILE:
4755
        raise errors.OpPrereqError("Export not supported for instances with"
4756
                                   " file-based disks")
4757

    
4758
  def Exec(self, feedback_fn):
4759
    """Export an instance to an image in the cluster.
4760

4761
    """
4762
    instance = self.instance
4763
    dst_node = self.dst_node
4764
    src_node = instance.primary_node
4765
    if self.op.shutdown:
4766
      # shutdown the instance, but not the disks
4767
      if not self.rpc.call_instance_shutdown(src_node, instance):
4768
        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4769
                                 (instance.name, src_node))
4770

    
4771
    vgname = self.cfg.GetVGName()
4772

    
4773
    snap_disks = []
4774

    
4775
    try:
4776
      for disk in instance.disks:
4777
        if disk.iv_name == "sda":
4778
          # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4779
          new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4780

    
4781
          if not new_dev_name:
4782
            logger.Error("could not snapshot block device %s on node %s" %
4783
                         (disk.logical_id[1], src_node))
4784
          else:
4785
            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4786
                                      logical_id=(vgname, new_dev_name),
4787
                                      physical_id=(vgname, new_dev_name),
4788
                                      iv_name=disk.iv_name)
4789
            snap_disks.append(new_dev)
4790

    
4791
    finally:
4792
      if self.op.shutdown and instance.status == "up":
4793
        if not self.rpc.call_instance_start(src_node, instance, None):
4794
          _ShutdownInstanceDisks(self, instance)
4795
          raise errors.OpExecError("Could not start instance")
4796

    
4797
    # TODO: check for size
4798

    
4799
    cluster_name = self.cfg.GetClusterName()
4800
    for dev in snap_disks:
4801
      if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4802
                                      instance, cluster_name):
4803
        logger.Error("could not export block device %s from node %s to node %s"
4804
                     % (dev.logical_id[1], src_node, dst_node.name))
4805
      if not self.rpc.call_blockdev_remove(src_node, dev):
4806
        logger.Error("could not remove snapshot block device %s from node %s" %
4807
                     (dev.logical_id[1], src_node))
4808

    
4809
    if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4810
      logger.Error("could not finalize export for instance %s on node %s" %
4811
                   (instance.name, dst_node.name))
4812

    
4813
    nodelist = self.cfg.GetNodeList()
4814
    nodelist.remove(dst_node.name)
4815

    
4816
    # on one-node clusters nodelist will be empty after the removal
4817
    # if we proceed the backup would be removed because OpQueryExports
4818
    # substitutes an empty list with the full cluster node list.
4819
    if nodelist:
4820
      exportlist = self.rpc.call_export_list(nodelist)
4821
      for node in exportlist:
4822
        if instance.name in exportlist[node]:
4823
          if not self.rpc.call_export_remove(node, instance.name):
4824
            logger.Error("could not remove older export for instance %s"
4825
                         " on node %s" % (instance.name, node))
4826

    
4827

    
4828
class LURemoveExport(NoHooksLU):
4829
  """Remove exports related to the named instance.
4830

4831
  """
4832
  _OP_REQP = ["instance_name"]
4833
  REQ_BGL = False
4834

    
4835
  def ExpandNames(self):
4836
    self.needed_locks = {}
4837
    # We need all nodes to be locked in order for RemoveExport to work, but we
4838
    # don't need to lock the instance itself, as nothing will happen to it (and
4839
    # we can remove exports also for a removed instance)
4840
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4841

    
4842
  def CheckPrereq(self):
4843
    """Check prerequisites.
4844
    """
4845
    pass
4846

    
4847
  def Exec(self, feedback_fn):
4848
    """Remove any export.
4849

4850
    """
4851
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4852
    # If the instance was not found we'll try with the name that was passed in.
4853
    # This will only work if it was an FQDN, though.
4854
    fqdn_warn = False
4855
    if not instance_name:
4856
      fqdn_warn = True
4857
      instance_name = self.op.instance_name
4858

    
4859
    exportlist = self.rpc.call_export_list(self.acquired_locks[
4860
      locking.LEVEL_NODE])
4861
    found = False
4862
    for node in exportlist:
4863
      if instance_name in exportlist[node]:
4864
        found = True
4865
        if not self.rpc.call_export_remove(node, instance_name):
4866
          logger.Error("could not remove export for instance %s"
4867
                       " on node %s" % (instance_name, node))
4868

    
4869
    if fqdn_warn and not found:
4870
      feedback_fn("Export not found. If trying to remove an export belonging"
4871
                  " to a deleted instance please use its Fully Qualified"
4872
                  " Domain Name.")
4873

    
4874

    
4875
class TagsLU(NoHooksLU):
4876
  """Generic tags LU.
4877

4878
  This is an abstract class which is the parent of all the other tags LUs.
4879

4880
  """
4881

    
4882
  def ExpandNames(self):
4883
    self.needed_locks = {}
4884
    if self.op.kind == constants.TAG_NODE:
4885
      name = self.cfg.ExpandNodeName(self.op.name)
4886
      if name is None:
4887
        raise errors.OpPrereqError("Invalid node name (%s)" %
4888
                                   (self.op.name,))
4889
      self.op.name = name
4890
      self.needed_locks[locking.LEVEL_NODE] = name
4891
    elif self.op.kind == constants.TAG_INSTANCE:
4892
      name = self.cfg.ExpandInstanceName(self.op.name)
4893
      if name is None:
4894
        raise errors.OpPrereqError("Invalid instance name (%s)" %
4895
                                   (self.op.name,))
4896
      self.op.name = name
4897
      self.needed_locks[locking.LEVEL_INSTANCE] = name
4898

    
4899
  def CheckPrereq(self):
4900
    """Check prerequisites.
4901

4902
    """
4903
    if self.op.kind == constants.TAG_CLUSTER:
4904
      self.target = self.cfg.GetClusterInfo()
4905
    elif self.op.kind == constants.TAG_NODE:
4906
      self.target = self.cfg.GetNodeInfo(self.op.name)
4907
    elif self.op.kind == constants.TAG_INSTANCE:
4908
      self.target = self.cfg.GetInstanceInfo(self.op.name)
4909
    else:
4910
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4911
                                 str(self.op.kind))
4912

    
4913

    
4914
class LUGetTags(TagsLU):
4915
  """Returns the tags of a given object.
4916

4917
  """
4918
  _OP_REQP = ["kind", "name"]
4919
  REQ_BGL = False
4920

    
4921
  def Exec(self, feedback_fn):
4922
    """Returns the tag list.
4923

4924
    """
4925
    return list(self.target.GetTags())
4926

    
4927

    
4928
class LUSearchTags(NoHooksLU):
4929
  """Searches the tags for a given pattern.
4930

4931
  """
4932
  _OP_REQP = ["pattern"]
4933
  REQ_BGL = False
4934

    
4935
  def ExpandNames(self):
4936
    self.needed_locks = {}
4937

    
4938
  def CheckPrereq(self):
4939
    """Check prerequisites.
4940

4941
    This checks the pattern passed for validity by compiling it.
4942

4943
    """
4944
    try:
4945
      self.re = re.compile(self.op.pattern)
4946
    except re.error, err:
4947
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4948
                                 (self.op.pattern, err))
4949

    
4950
  def Exec(self, feedback_fn):
4951
    """Returns the tag list.
4952

4953
    """
4954
    cfg = self.cfg
4955
    tgts = [("/cluster", cfg.GetClusterInfo())]
4956
    ilist = cfg.GetAllInstancesInfo().values()
4957
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4958
    nlist = cfg.GetAllNodesInfo().values()
4959
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4960
    results = []
4961
    for path, target in tgts:
4962
      for tag in target.GetTags():
4963
        if self.re.search(tag):
4964
          results.append((path, tag))
4965
    return results
4966

    
4967

    
4968
class LUAddTags(TagsLU):
4969
  """Sets a tag on a given object.
4970

4971
  """
4972
  _OP_REQP = ["kind", "name", "tags"]
4973
  REQ_BGL = False
4974

    
4975
  def CheckPrereq(self):
4976
    """Check prerequisites.
4977

4978
    This checks the type and length of the tag name and value.
4979

4980
    """
4981
    TagsLU.CheckPrereq(self)
4982
    for tag in self.op.tags:
4983
      objects.TaggableObject.ValidateTag(tag)
4984

    
4985
  def Exec(self, feedback_fn):
4986
    """Sets the tag.
4987

4988
    """
4989
    try:
4990
      for tag in self.op.tags:
4991
        self.target.AddTag(tag)
4992
    except errors.TagError, err:
4993
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
4994
    try:
4995
      self.cfg.Update(self.target)
4996
    except errors.ConfigurationError:
4997
      raise errors.OpRetryError("There has been a modification to the"
4998
                                " config file and the operation has been"
4999
                                " aborted. Please retry.")
5000

    
5001

    
5002
class LUDelTags(TagsLU):
5003
  """Delete a list of tags from a given object.
5004

5005
  """
5006
  _OP_REQP = ["kind", "name", "tags"]
5007
  REQ_BGL = False
5008

    
5009
  def CheckPrereq(self):
5010
    """Check prerequisites.
5011

5012
    This checks that we have the given tag.
5013

5014
    """
5015
    TagsLU.CheckPrereq(self)
5016
    for tag in self.op.tags:
5017
      objects.TaggableObject.ValidateTag(tag)
5018
    del_tags = frozenset(self.op.tags)
5019
    cur_tags = self.target.GetTags()
5020
    if not del_tags <= cur_tags:
5021
      diff_tags = del_tags - cur_tags
5022
      diff_names = ["'%s'" % tag for tag in diff_tags]
5023
      diff_names.sort()
5024
      raise errors.OpPrereqError("Tag(s) %s not found" %
5025
                                 (",".join(diff_names)))
5026

    
5027
  def Exec(self, feedback_fn):
5028
    """Remove the tag from the object.
5029

5030
    """
5031
    for tag in self.op.tags:
5032
      self.target.RemoveTag(tag)
5033
    try:
5034
      self.cfg.Update(self.target)
5035
    except errors.ConfigurationError:
5036
      raise errors.OpRetryError("There has been a modification to the"
5037
                                " config file and the operation has been"
5038
                                " aborted. Please retry.")
5039

    
5040

    
5041
class LUTestDelay(NoHooksLU):
5042
  """Sleep for a specified amount of time.
5043

5044
  This LU sleeps on the master and/or nodes for a specified amount of
5045
  time.
5046

5047
  """
5048
  _OP_REQP = ["duration", "on_master", "on_nodes"]
5049
  REQ_BGL = False
5050

    
5051
  def ExpandNames(self):
5052
    """Expand names and set required locks.
5053

5054
    This expands the node list, if any.
5055

5056
    """
5057
    self.needed_locks = {}
5058
    if self.op.on_nodes:
5059
      # _GetWantedNodes can be used here, but is not always appropriate to use
5060
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5061
      # more information.
5062
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5063
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5064

    
5065
  def CheckPrereq(self):
5066
    """Check prerequisites.
5067

5068
    """
5069

    
5070
  def Exec(self, feedback_fn):
5071
    """Do the actual sleep.
5072

5073
    """
5074
    if self.op.on_master:
5075
      if not utils.TestDelay(self.op.duration):
5076
        raise errors.OpExecError("Error during master delay test")
5077
    if self.op.on_nodes:
5078
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5079
      if not result:
5080
        raise errors.OpExecError("Complete failure from rpc call")
5081
      for node, node_result in result.items():
5082
        if not node_result:
5083
          raise errors.OpExecError("Failure during rpc call to node %s,"
5084
                                   " result: %s" % (node, node_result))
5085

    
5086

    
5087
class IAllocator(object):
5088
  """IAllocator framework.
5089

5090
  An IAllocator instance has three sets of attributes:
5091
    - cfg that is needed to query the cluster
5092
    - input data (all members of the _KEYS class attribute are required)
5093
    - four buffer attributes (in|out_data|text), that represent the
5094
      input (to the external script) in text and data structure format,
5095
      and the output from it, again in two formats
5096
    - the result variables from the script (success, info, nodes) for
5097
      easy usage
5098

5099
  """
5100
  _ALLO_KEYS = [
5101
    "mem_size", "disks", "disk_template",
5102
    "os", "tags", "nics", "vcpus",
5103
    ]
5104
  _RELO_KEYS = [
5105
    "relocate_from",
5106
    ]
5107

    
5108
  def __init__(self, lu, mode, name, **kwargs):
5109
    self.lu = lu
5110
    # init buffer variables
5111
    self.in_text = self.out_text = self.in_data = self.out_data = None
5112
    # init all input fields so that pylint is happy
5113
    self.mode = mode
5114
    self.name = name
5115
    self.mem_size = self.disks = self.disk_template = None
5116
    self.os = self.tags = self.nics = self.vcpus = None
5117
    self.relocate_from = None
5118
    # computed fields
5119
    self.required_nodes = None
5120
    # init result fields
5121
    self.success = self.info = self.nodes = None
5122
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5123
      keyset = self._ALLO_KEYS
5124
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5125
      keyset = self._RELO_KEYS
5126
    else:
5127
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5128
                                   " IAllocator" % self.mode)
5129
    for key in kwargs:
5130
      if key not in keyset:
5131
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
5132
                                     " IAllocator" % key)
5133
      setattr(self, key, kwargs[key])
5134
    for key in keyset:
5135
      if key not in kwargs:
5136
        raise errors.ProgrammerError("Missing input parameter '%s' to"
5137
                                     " IAllocator" % key)
5138
    self._BuildInputData()
5139

    
5140
  def _ComputeClusterData(self):
5141
    """Compute the generic allocator input data.
5142

5143
    This is the data that is independent of the actual operation.
5144

5145
    """
5146
    cfg = self.lu.cfg
5147
    cluster_info = cfg.GetClusterInfo()
5148
    # cluster data
5149
    data = {
5150
      "version": 1,
5151
      "cluster_name": cfg.GetClusterName(),
5152
      "cluster_tags": list(cluster_info.GetTags()),
5153
      "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5154
      # we don't have job IDs
5155
      }
5156

    
5157
    i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5158

    
5159
    # node data
5160
    node_results = {}
5161
    node_list = cfg.GetNodeList()
5162
    # FIXME: here we have only one hypervisor information, but
5163
    # instance can belong to different hypervisors
5164
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5165
                                           cfg.GetHypervisorType())
5166
    for nname in node_list:
5167
      ninfo = cfg.GetNodeInfo(nname)
5168
      if nname not in node_data or not isinstance(node_data[nname], dict):
5169
        raise errors.OpExecError("Can't get data for node %s" % nname)
5170
      remote_info = node_data[nname]
5171
      for attr in ['memory_total', 'memory_free', 'memory_dom0',
5172
                   'vg_size', 'vg_free', 'cpu_total']:
5173
        if attr not in remote_info:
5174
          raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5175
                                   (nname, attr))
5176
        try:
5177
          remote_info[attr] = int(remote_info[attr])
5178
        except ValueError, err:
5179
          raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5180
                                   " %s" % (nname, attr, str(err)))
5181
      # compute memory used by primary instances
5182
      i_p_mem = i_p_up_mem = 0
5183
      for iinfo in i_list:
5184
        if iinfo.primary_node == nname:
5185
          i_p_mem += iinfo.memory
5186
          if iinfo.status == "up":
5187
            i_p_up_mem += iinfo.memory
5188

    
5189
      # compute memory used by instances
5190
      pnr = {
5191
        "tags": list(ninfo.GetTags()),
5192
        "total_memory": remote_info['memory_total'],
5193
        "reserved_memory": remote_info['memory_dom0'],
5194
        "free_memory": remote_info['memory_free'],
5195
        "i_pri_memory": i_p_mem,
5196
        "i_pri_up_memory": i_p_up_mem,
5197
        "total_disk": remote_info['vg_size'],
5198
        "free_disk": remote_info['vg_free'],
5199
        "primary_ip": ninfo.primary_ip,
5200
        "secondary_ip": ninfo.secondary_ip,
5201
        "total_cpus": remote_info['cpu_total'],
5202
        }
5203
      node_results[nname] = pnr
5204
    data["nodes"] = node_results
5205

    
5206
    # instance data
5207
    instance_data = {}
5208
    for iinfo in i_list:
5209
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5210
                  for n in iinfo.nics]
5211
      pir = {
5212
        "tags": list(iinfo.GetTags()),
5213
        "should_run": iinfo.status == "up",
5214
        "vcpus": iinfo.vcpus,
5215
        "memory": iinfo.memory,
5216
        "os": iinfo.os,
5217
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5218
        "nics": nic_data,
5219
        "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5220
        "disk_template": iinfo.disk_template,
5221
        "hypervisor": iinfo.hypervisor,
5222
        }
5223
      instance_data[iinfo.name] = pir
5224

    
5225
    data["instances"] = instance_data
5226

    
5227
    self.in_data = data
5228

    
5229
  def _AddNewInstance(self):
5230
    """Add new instance data to allocator structure.
5231

5232
    This in combination with _AllocatorGetClusterData will create the
5233
    correct structure needed as input for the allocator.
5234

5235
    The checks for the completeness of the opcode must have already been
5236
    done.
5237

5238
    """
5239
    data = self.in_data
5240
    if len(self.disks) != 2:
5241
      raise errors.OpExecError("Only two-disk configurations supported")
5242

    
5243
    disk_space = _ComputeDiskSize(self.disk_template,
5244
                                  self.disks[0]["size"], self.disks[1]["size"])
5245

    
5246
    if self.disk_template in constants.DTS_NET_MIRROR:
5247
      self.required_nodes = 2
5248
    else:
5249
      self.required_nodes = 1
5250
    request = {
5251
      "type": "allocate",
5252
      "name": self.name,
5253
      "disk_template": self.disk_template,
5254
      "tags": self.tags,
5255
      "os": self.os,
5256
      "vcpus": self.vcpus,
5257
      "memory": self.mem_size,
5258
      "disks": self.disks,
5259
      "disk_space_total": disk_space,
5260
      "nics": self.nics,
5261
      "required_nodes": self.required_nodes,
5262
      }
5263
    data["request"] = request
5264

    
5265
  def _AddRelocateInstance(self):
5266
    """Add relocate instance data to allocator structure.
5267

5268
    This in combination with _IAllocatorGetClusterData will create the
5269
    correct structure needed as input for the allocator.
5270

5271
    The checks for the completeness of the opcode must have already been
5272
    done.
5273

5274
    """
5275
    instance = self.lu.cfg.GetInstanceInfo(self.name)
5276
    if instance is None:
5277
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
5278
                                   " IAllocator" % self.name)
5279

    
5280
    if instance.disk_template not in constants.DTS_NET_MIRROR:
5281
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5282

    
5283
    if len(instance.secondary_nodes) != 1:
5284
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
5285

    
5286
    self.required_nodes = 1
5287

    
5288
    disk_space = _ComputeDiskSize(instance.disk_template,
5289
                                  instance.disks[0].size,
5290
                                  instance.disks[1].size)
5291

    
5292
    request = {
5293
      "type": "relocate",
5294
      "name": self.name,
5295
      "disk_space_total": disk_space,
5296
      "required_nodes": self.required_nodes,
5297
      "relocate_from": self.relocate_from,
5298
      }
5299
    self.in_data["request"] = request
5300

    
5301
  def _BuildInputData(self):
5302
    """Build input data structures.
5303

5304
    """
5305
    self._ComputeClusterData()
5306

    
5307
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5308
      self._AddNewInstance()
5309
    else:
5310
      self._AddRelocateInstance()
5311

    
5312
    self.in_text = serializer.Dump(self.in_data)
5313

    
5314
  def Run(self, name, validate=True, call_fn=None):
5315
    """Run an instance allocator and return the results.
5316

5317
    """
5318
    if call_fn is None:
5319
      call_fn = self.lu.rpc.call_iallocator_runner
5320
    data = self.in_text
5321

    
5322
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5323

    
5324
    if not isinstance(result, (list, tuple)) or len(result) != 4:
5325
      raise errors.OpExecError("Invalid result from master iallocator runner")
5326

    
5327
    rcode, stdout, stderr, fail = result
5328

    
5329
    if rcode == constants.IARUN_NOTFOUND:
5330
      raise errors.OpExecError("Can't find allocator '%s'" % name)
5331
    elif rcode == constants.IARUN_FAILURE:
5332
      raise errors.OpExecError("Instance allocator call failed: %s,"
5333
                               " output: %s" % (fail, stdout+stderr))
5334
    self.out_text = stdout
5335
    if validate:
5336
      self._ValidateResult()
5337

    
5338
  def _ValidateResult(self):
5339
    """Process the allocator results.
5340

5341
    This will process and if successful save the result in
5342
    self.out_data and the other parameters.
5343

5344
    """
5345
    try:
5346
      rdict = serializer.Load(self.out_text)
5347
    except Exception, err:
5348
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5349

    
5350
    if not isinstance(rdict, dict):
5351
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
5352

    
5353
    for key in "success", "info", "nodes":
5354
      if key not in rdict:
5355
        raise errors.OpExecError("Can't parse iallocator results:"
5356
                                 " missing key '%s'" % key)
5357
      setattr(self, key, rdict[key])
5358

    
5359
    if not isinstance(rdict["nodes"], list):
5360
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5361
                               " is not a list")
5362
    self.out_data = rdict
5363

    
5364

    
5365
class LUTestAllocator(NoHooksLU):
5366
  """Run allocator tests.
5367

5368
  This LU runs the allocator tests
5369

5370
  """
5371
  _OP_REQP = ["direction", "mode", "name"]
5372

    
5373
  def CheckPrereq(self):
5374
    """Check prerequisites.
5375

5376
    This checks the opcode parameters depending on the director and mode test.
5377

5378
    """
5379
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5380
      for attr in ["name", "mem_size", "disks", "disk_template",
5381
                   "os", "tags", "nics", "vcpus"]:
5382
        if not hasattr(self.op, attr):
5383
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5384
                                     attr)
5385
      iname = self.cfg.ExpandInstanceName(self.op.name)
5386
      if iname is not None:
5387
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5388
                                   iname)
5389
      if not isinstance(self.op.nics, list):
5390
        raise errors.OpPrereqError("Invalid parameter 'nics'")
5391
      for row in self.op.nics:
5392
        if (not isinstance(row, dict) or
5393
            "mac" not in row or
5394
            "ip" not in row or
5395
            "bridge" not in row):
5396
          raise errors.OpPrereqError("Invalid contents of the"
5397
                                     " 'nics' parameter")
5398
      if not isinstance(self.op.disks, list):
5399
        raise errors.OpPrereqError("Invalid parameter 'disks'")
5400
      if len(self.op.disks) != 2:
5401
        raise errors.OpPrereqError("Only two-disk configurations supported")
5402
      for row in self.op.disks:
5403
        if (not isinstance(row, dict) or
5404
            "size" not in row or
5405
            not isinstance(row["size"], int) or
5406
            "mode" not in row or
5407
            row["mode"] not in ['r', 'w']):
5408
          raise errors.OpPrereqError("Invalid contents of the"
5409
                                     " 'disks' parameter")
5410
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5411
      if not hasattr(self.op, "name"):
5412
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5413
      fname = self.cfg.ExpandInstanceName(self.op.name)
5414
      if fname is None:
5415
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5416
                                   self.op.name)
5417
      self.op.name = fname
5418
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5419
    else:
5420
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5421
                                 self.op.mode)
5422

    
5423
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5424
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
5425
        raise errors.OpPrereqError("Missing allocator name")
5426
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5427
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
5428
                                 self.op.direction)
5429

    
5430
  def Exec(self, feedback_fn):
5431
    """Run the allocator test.
5432

5433
    """
5434
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5435
      ial = IAllocator(self,
5436
                       mode=self.op.mode,
5437
                       name=self.op.name,
5438
                       mem_size=self.op.mem_size,
5439
                       disks=self.op.disks,
5440
                       disk_template=self.op.disk_template,
5441
                       os=self.op.os,
5442
                       tags=self.op.tags,
5443
                       nics=self.op.nics,
5444
                       vcpus=self.op.vcpus,
5445
                       )
5446
    else:
5447
      ial = IAllocator(self,
5448
                       mode=self.op.mode,
5449
                       name=self.op.name,
5450
                       relocate_from=list(self.relocate_from),
5451
                       )
5452

    
5453
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
5454
      result = ial.in_text
5455
    else:
5456
      ial.Run(self.op.allocator, validate=False)
5457
      result = ial.out_text
5458
    return result