Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 9854f5d0

History | View | Annotate | Download (239.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396
  return wanted
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the node is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _CheckNodeNotDrained(lu, node):
445
  """Ensure that a given node is not drained.
446

447
  @param lu: the LU on behalf of which we make the check
448
  @param node: the node to check
449
  @raise errors.OpPrereqError: if the node is drained
450

451
  """
452
  if lu.cfg.GetNodeInfo(node).drained:
453
    raise errors.OpPrereqError("Can't use drained node %s" % node)
454

    
455

    
456
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457
                          memory, vcpus, nics):
458
  """Builds instance related env variables for hooks
459

460
  This builds the hook environment from individual variables.
461

462
  @type name: string
463
  @param name: the name of the instance
464
  @type primary_node: string
465
  @param primary_node: the name of the instance's primary node
466
  @type secondary_nodes: list
467
  @param secondary_nodes: list of secondary nodes as strings
468
  @type os_type: string
469
  @param os_type: the name of the instance's OS
470
  @type status: boolean
471
  @param status: the should_run status of the instance
472
  @type memory: string
473
  @param memory: the memory size of the instance
474
  @type vcpus: string
475
  @param vcpus: the count of VCPUs the instance has
476
  @type nics: list
477
  @param nics: list of tuples (ip, bridge, mac) representing
478
      the NICs the instance  has
479
  @rtype: dict
480
  @return: the hook environment for this instance
481

482
  """
483
  if status:
484
    str_status = "up"
485
  else:
486
    str_status = "down"
487
  env = {
488
    "OP_TARGET": name,
489
    "INSTANCE_NAME": name,
490
    "INSTANCE_PRIMARY": primary_node,
491
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
492
    "INSTANCE_OS_TYPE": os_type,
493
    "INSTANCE_STATUS": str_status,
494
    "INSTANCE_MEMORY": memory,
495
    "INSTANCE_VCPUS": vcpus,
496
  }
497

    
498
  if nics:
499
    nic_count = len(nics)
500
    for idx, (ip, bridge, mac) in enumerate(nics):
501
      if ip is None:
502
        ip = ""
503
      env["INSTANCE_NIC%d_IP" % idx] = ip
504
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
505
      env["INSTANCE_NIC%d_HWADDR" % idx] = mac
506
  else:
507
    nic_count = 0
508

    
509
  env["INSTANCE_NIC_COUNT"] = nic_count
510

    
511
  return env
512

    
513

    
514
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
515
  """Builds instance related env variables for hooks from an object.
516

517
  @type lu: L{LogicalUnit}
518
  @param lu: the logical unit on whose behalf we execute
519
  @type instance: L{objects.Instance}
520
  @param instance: the instance for which we should build the
521
      environment
522
  @type override: dict
523
  @param override: dictionary with key/values that will override
524
      our values
525
  @rtype: dict
526
  @return: the hook environment dictionary
527

528
  """
529
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
530
  args = {
531
    'name': instance.name,
532
    'primary_node': instance.primary_node,
533
    'secondary_nodes': instance.secondary_nodes,
534
    'os_type': instance.os,
535
    'status': instance.admin_up,
536
    'memory': bep[constants.BE_MEMORY],
537
    'vcpus': bep[constants.BE_VCPUS],
538
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
539
  }
540
  if override:
541
    args.update(override)
542
  return _BuildInstanceHookEnv(**args)
543

    
544

    
545
def _AdjustCandidatePool(lu):
546
  """Adjust the candidate pool after node operations.
547

548
  """
549
  mod_list = lu.cfg.MaintainCandidatePool()
550
  if mod_list:
551
    lu.LogInfo("Promoted nodes to master candidate role: %s",
552
               ", ".join(node.name for node in mod_list))
553
    for name in mod_list:
554
      lu.context.ReaddNode(name)
555
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
556
  if mc_now > mc_max:
557
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
558
               (mc_now, mc_max))
559

    
560

    
561
def _CheckInstanceBridgesExist(lu, instance):
562
  """Check that the brigdes needed by an instance exist.
563

564
  """
565
  # check bridges existance
566
  brlist = [nic.bridge for nic in instance.nics]
567
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
568
  result.Raise()
569
  if not result.data:
570
    raise errors.OpPrereqError("One or more target bridges %s does not"
571
                               " exist on destination node '%s'" %
572
                               (brlist, instance.primary_node))
573

    
574

    
575
class LUDestroyCluster(NoHooksLU):
576
  """Logical unit for destroying the cluster.
577

578
  """
579
  _OP_REQP = []
580

    
581
  def CheckPrereq(self):
582
    """Check prerequisites.
583

584
    This checks whether the cluster is empty.
585

586
    Any errors are signalled by raising errors.OpPrereqError.
587

588
    """
589
    master = self.cfg.GetMasterNode()
590

    
591
    nodelist = self.cfg.GetNodeList()
592
    if len(nodelist) != 1 or nodelist[0] != master:
593
      raise errors.OpPrereqError("There are still %d node(s) in"
594
                                 " this cluster." % (len(nodelist) - 1))
595
    instancelist = self.cfg.GetInstanceList()
596
    if instancelist:
597
      raise errors.OpPrereqError("There are still %d instance(s) in"
598
                                 " this cluster." % len(instancelist))
599

    
600
  def Exec(self, feedback_fn):
601
    """Destroys the cluster.
602

603
    """
604
    master = self.cfg.GetMasterNode()
605
    result = self.rpc.call_node_stop_master(master, False)
606
    result.Raise()
607
    if not result.data:
608
      raise errors.OpExecError("Could not disable the master role")
609
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
610
    utils.CreateBackup(priv_key)
611
    utils.CreateBackup(pub_key)
612
    return master
613

    
614

    
615
class LUVerifyCluster(LogicalUnit):
616
  """Verifies the cluster status.
617

618
  """
619
  HPATH = "cluster-verify"
620
  HTYPE = constants.HTYPE_CLUSTER
621
  _OP_REQP = ["skip_checks"]
622
  REQ_BGL = False
623

    
624
  def ExpandNames(self):
625
    self.needed_locks = {
626
      locking.LEVEL_NODE: locking.ALL_SET,
627
      locking.LEVEL_INSTANCE: locking.ALL_SET,
628
    }
629
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
630

    
631
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
632
                  node_result, feedback_fn, master_files,
633
                  drbd_map):
634
    """Run multiple tests against a node.
635

636
    Test list:
637

638
      - compares ganeti version
639
      - checks vg existance and size > 20G
640
      - checks config file checksum
641
      - checks ssh to other nodes
642

643
    @type nodeinfo: L{objects.Node}
644
    @param nodeinfo: the node to check
645
    @param file_list: required list of files
646
    @param local_cksum: dictionary of local files and their checksums
647
    @param node_result: the results from the node
648
    @param feedback_fn: function used to accumulate results
649
    @param master_files: list of files that only masters should have
650
    @param drbd_map: the useddrbd minors for this node, in
651
        form of minor: (instance, must_exist) which correspond to instances
652
        and their running status
653

654
    """
655
    node = nodeinfo.name
656

    
657
    # main result, node_result should be a non-empty dict
658
    if not node_result or not isinstance(node_result, dict):
659
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
660
      return True
661

    
662
    # compares ganeti version
663
    local_version = constants.PROTOCOL_VERSION
664
    remote_version = node_result.get('version', None)
665
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
666
            len(remote_version) == 2):
667
      feedback_fn("  - ERROR: connection to %s failed" % (node))
668
      return True
669

    
670
    if local_version != remote_version[0]:
671
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
672
                  " node %s %s" % (local_version, node, remote_version[0]))
673
      return True
674

    
675
    # node seems compatible, we can actually try to look into its results
676

    
677
    bad = False
678

    
679
    # full package version
680
    if constants.RELEASE_VERSION != remote_version[1]:
681
      feedback_fn("  - WARNING: software version mismatch: master %s,"
682
                  " node %s %s" %
683
                  (constants.RELEASE_VERSION, node, remote_version[1]))
684

    
685
    # checks vg existence and size > 20G
686

    
687
    vglist = node_result.get(constants.NV_VGLIST, None)
688
    if not vglist:
689
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
690
                      (node,))
691
      bad = True
692
    else:
693
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
694
                                            constants.MIN_VG_SIZE)
695
      if vgstatus:
696
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
697
        bad = True
698

    
699
    # checks config file checksum
700

    
701
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
702
    if not isinstance(remote_cksum, dict):
703
      bad = True
704
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
705
    else:
706
      for file_name in file_list:
707
        node_is_mc = nodeinfo.master_candidate
708
        must_have_file = file_name not in master_files
709
        if file_name not in remote_cksum:
710
          if node_is_mc or must_have_file:
711
            bad = True
712
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
713
        elif remote_cksum[file_name] != local_cksum[file_name]:
714
          if node_is_mc or must_have_file:
715
            bad = True
716
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
717
          else:
718
            # not candidate and this is not a must-have file
719
            bad = True
720
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
721
                        " '%s'" % file_name)
722
        else:
723
          # all good, except non-master/non-must have combination
724
          if not node_is_mc and not must_have_file:
725
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
726
                        " candidates" % file_name)
727

    
728
    # checks ssh to any
729

    
730
    if constants.NV_NODELIST not in node_result:
731
      bad = True
732
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
733
    else:
734
      if node_result[constants.NV_NODELIST]:
735
        bad = True
736
        for node in node_result[constants.NV_NODELIST]:
737
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
738
                          (node, node_result[constants.NV_NODELIST][node]))
739

    
740
    if constants.NV_NODENETTEST not in node_result:
741
      bad = True
742
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
743
    else:
744
      if node_result[constants.NV_NODENETTEST]:
745
        bad = True
746
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
747
        for node in nlist:
748
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
749
                          (node, node_result[constants.NV_NODENETTEST][node]))
750

    
751
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
752
    if isinstance(hyp_result, dict):
753
      for hv_name, hv_result in hyp_result.iteritems():
754
        if hv_result is not None:
755
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
756
                      (hv_name, hv_result))
757

    
758
    # check used drbd list
759
    used_minors = node_result.get(constants.NV_DRBDLIST, [])
760
    if not isinstance(used_minors, (tuple, list)):
761
      feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
762
                  str(used_minors))
763
    else:
764
      for minor, (iname, must_exist) in drbd_map.items():
765
        if minor not in used_minors and must_exist:
766
          feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
767
                      (minor, iname))
768
          bad = True
769
      for minor in used_minors:
770
        if minor not in drbd_map:
771
          feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
772
          bad = True
773

    
774
    return bad
775

    
776
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
777
                      node_instance, feedback_fn, n_offline):
778
    """Verify an instance.
779

780
    This function checks to see if the required block devices are
781
    available on the instance's node.
782

783
    """
784
    bad = False
785

    
786
    node_current = instanceconfig.primary_node
787

    
788
    node_vol_should = {}
789
    instanceconfig.MapLVsByNode(node_vol_should)
790

    
791
    for node in node_vol_should:
792
      if node in n_offline:
793
        # ignore missing volumes on offline nodes
794
        continue
795
      for volume in node_vol_should[node]:
796
        if node not in node_vol_is or volume not in node_vol_is[node]:
797
          feedback_fn("  - ERROR: volume %s missing on node %s" %
798
                          (volume, node))
799
          bad = True
800

    
801
    if instanceconfig.admin_up:
802
      if ((node_current not in node_instance or
803
          not instance in node_instance[node_current]) and
804
          node_current not in n_offline):
805
        feedback_fn("  - ERROR: instance %s not running on node %s" %
806
                        (instance, node_current))
807
        bad = True
808

    
809
    for node in node_instance:
810
      if (not node == node_current):
811
        if instance in node_instance[node]:
812
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
813
                          (instance, node))
814
          bad = True
815

    
816
    return bad
817

    
818
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
819
    """Verify if there are any unknown volumes in the cluster.
820

821
    The .os, .swap and backup volumes are ignored. All other volumes are
822
    reported as unknown.
823

824
    """
825
    bad = False
826

    
827
    for node in node_vol_is:
828
      for volume in node_vol_is[node]:
829
        if node not in node_vol_should or volume not in node_vol_should[node]:
830
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
831
                      (volume, node))
832
          bad = True
833
    return bad
834

    
835
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
836
    """Verify the list of running instances.
837

838
    This checks what instances are running but unknown to the cluster.
839

840
    """
841
    bad = False
842
    for node in node_instance:
843
      for runninginstance in node_instance[node]:
844
        if runninginstance not in instancelist:
845
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
846
                          (runninginstance, node))
847
          bad = True
848
    return bad
849

    
850
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
851
    """Verify N+1 Memory Resilience.
852

853
    Check that if one single node dies we can still start all the instances it
854
    was primary for.
855

856
    """
857
    bad = False
858

    
859
    for node, nodeinfo in node_info.iteritems():
860
      # This code checks that every node which is now listed as secondary has
861
      # enough memory to host all instances it is supposed to should a single
862
      # other node in the cluster fail.
863
      # FIXME: not ready for failover to an arbitrary node
864
      # FIXME: does not support file-backed instances
865
      # WARNING: we currently take into account down instances as well as up
866
      # ones, considering that even if they're down someone might want to start
867
      # them even in the event of a node failure.
868
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
869
        needed_mem = 0
870
        for instance in instances:
871
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
872
          if bep[constants.BE_AUTO_BALANCE]:
873
            needed_mem += bep[constants.BE_MEMORY]
874
        if nodeinfo['mfree'] < needed_mem:
875
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
876
                      " failovers should node %s fail" % (node, prinode))
877
          bad = True
878
    return bad
879

    
880
  def CheckPrereq(self):
881
    """Check prerequisites.
882

883
    Transform the list of checks we're going to skip into a set and check that
884
    all its members are valid.
885

886
    """
887
    self.skip_set = frozenset(self.op.skip_checks)
888
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
889
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
890

    
891
  def BuildHooksEnv(self):
892
    """Build hooks env.
893

894
    Cluster-Verify hooks just rone in the post phase and their failure makes
895
    the output be logged in the verify output and the verification to fail.
896

897
    """
898
    all_nodes = self.cfg.GetNodeList()
899
    # TODO: populate the environment with useful information for verify hooks
900
    env = {}
901
    return env, [], all_nodes
902

    
903
  def Exec(self, feedback_fn):
904
    """Verify integrity of cluster, performing various test on nodes.
905

906
    """
907
    bad = False
908
    feedback_fn("* Verifying global settings")
909
    for msg in self.cfg.VerifyConfig():
910
      feedback_fn("  - ERROR: %s" % msg)
911

    
912
    vg_name = self.cfg.GetVGName()
913
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
914
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
915
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
916
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
917
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
918
                        for iname in instancelist)
919
    i_non_redundant = [] # Non redundant instances
920
    i_non_a_balanced = [] # Non auto-balanced instances
921
    n_offline = [] # List of offline nodes
922
    n_drained = [] # List of nodes being drained
923
    node_volume = {}
924
    node_instance = {}
925
    node_info = {}
926
    instance_cfg = {}
927

    
928
    # FIXME: verify OS list
929
    # do local checksums
930
    master_files = [constants.CLUSTER_CONF_FILE]
931

    
932
    file_names = ssconf.SimpleStore().GetFileList()
933
    file_names.append(constants.SSL_CERT_FILE)
934
    file_names.append(constants.RAPI_CERT_FILE)
935
    file_names.extend(master_files)
936

    
937
    local_checksums = utils.FingerprintFiles(file_names)
938

    
939
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
940
    node_verify_param = {
941
      constants.NV_FILELIST: file_names,
942
      constants.NV_NODELIST: [node.name for node in nodeinfo
943
                              if not node.offline],
944
      constants.NV_HYPERVISOR: hypervisors,
945
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
946
                                  node.secondary_ip) for node in nodeinfo
947
                                 if not node.offline],
948
      constants.NV_LVLIST: vg_name,
949
      constants.NV_INSTANCELIST: hypervisors,
950
      constants.NV_VGLIST: None,
951
      constants.NV_VERSION: None,
952
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
953
      constants.NV_DRBDLIST: None,
954
      }
955
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
956
                                           self.cfg.GetClusterName())
957

    
958
    cluster = self.cfg.GetClusterInfo()
959
    master_node = self.cfg.GetMasterNode()
960
    all_drbd_map = self.cfg.ComputeDRBDMap()
961

    
962
    for node_i in nodeinfo:
963
      node = node_i.name
964
      nresult = all_nvinfo[node].data
965

    
966
      if node_i.offline:
967
        feedback_fn("* Skipping offline node %s" % (node,))
968
        n_offline.append(node)
969
        continue
970

    
971
      if node == master_node:
972
        ntype = "master"
973
      elif node_i.master_candidate:
974
        ntype = "master candidate"
975
      elif node_i.drained:
976
        ntype = "drained"
977
        n_drained.append(node)
978
      else:
979
        ntype = "regular"
980
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
981

    
982
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
983
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
984
        bad = True
985
        continue
986

    
987
      node_drbd = {}
988
      for minor, instance in all_drbd_map[node].items():
989
        instance = instanceinfo[instance]
990
        node_drbd[minor] = (instance.name, instance.admin_up)
991
      result = self._VerifyNode(node_i, file_names, local_checksums,
992
                                nresult, feedback_fn, master_files,
993
                                node_drbd)
994
      bad = bad or result
995

    
996
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
997
      if isinstance(lvdata, basestring):
998
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
999
                    (node, utils.SafeEncode(lvdata)))
1000
        bad = True
1001
        node_volume[node] = {}
1002
      elif not isinstance(lvdata, dict):
1003
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1004
        bad = True
1005
        continue
1006
      else:
1007
        node_volume[node] = lvdata
1008

    
1009
      # node_instance
1010
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1011
      if not isinstance(idata, list):
1012
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1013
                    (node,))
1014
        bad = True
1015
        continue
1016

    
1017
      node_instance[node] = idata
1018

    
1019
      # node_info
1020
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1021
      if not isinstance(nodeinfo, dict):
1022
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1023
        bad = True
1024
        continue
1025

    
1026
      try:
1027
        node_info[node] = {
1028
          "mfree": int(nodeinfo['memory_free']),
1029
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1030
          "pinst": [],
1031
          "sinst": [],
1032
          # dictionary holding all instances this node is secondary for,
1033
          # grouped by their primary node. Each key is a cluster node, and each
1034
          # value is a list of instances which have the key as primary and the
1035
          # current node as secondary.  this is handy to calculate N+1 memory
1036
          # availability if you can only failover from a primary to its
1037
          # secondary.
1038
          "sinst-by-pnode": {},
1039
        }
1040
      except ValueError:
1041
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1042
        bad = True
1043
        continue
1044

    
1045
    node_vol_should = {}
1046

    
1047
    for instance in instancelist:
1048
      feedback_fn("* Verifying instance %s" % instance)
1049
      inst_config = instanceinfo[instance]
1050
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1051
                                     node_instance, feedback_fn, n_offline)
1052
      bad = bad or result
1053
      inst_nodes_offline = []
1054

    
1055
      inst_config.MapLVsByNode(node_vol_should)
1056

    
1057
      instance_cfg[instance] = inst_config
1058

    
1059
      pnode = inst_config.primary_node
1060
      if pnode in node_info:
1061
        node_info[pnode]['pinst'].append(instance)
1062
      elif pnode not in n_offline:
1063
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1064
                    " %s failed" % (instance, pnode))
1065
        bad = True
1066

    
1067
      if pnode in n_offline:
1068
        inst_nodes_offline.append(pnode)
1069

    
1070
      # If the instance is non-redundant we cannot survive losing its primary
1071
      # node, so we are not N+1 compliant. On the other hand we have no disk
1072
      # templates with more than one secondary so that situation is not well
1073
      # supported either.
1074
      # FIXME: does not support file-backed instances
1075
      if len(inst_config.secondary_nodes) == 0:
1076
        i_non_redundant.append(instance)
1077
      elif len(inst_config.secondary_nodes) > 1:
1078
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1079
                    % instance)
1080

    
1081
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1082
        i_non_a_balanced.append(instance)
1083

    
1084
      for snode in inst_config.secondary_nodes:
1085
        if snode in node_info:
1086
          node_info[snode]['sinst'].append(instance)
1087
          if pnode not in node_info[snode]['sinst-by-pnode']:
1088
            node_info[snode]['sinst-by-pnode'][pnode] = []
1089
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1090
        elif snode not in n_offline:
1091
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1092
                      " %s failed" % (instance, snode))
1093
          bad = True
1094
        if snode in n_offline:
1095
          inst_nodes_offline.append(snode)
1096

    
1097
      if inst_nodes_offline:
1098
        # warn that the instance lives on offline nodes, and set bad=True
1099
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1100
                    ", ".join(inst_nodes_offline))
1101
        bad = True
1102

    
1103
    feedback_fn("* Verifying orphan volumes")
1104
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1105
                                       feedback_fn)
1106
    bad = bad or result
1107

    
1108
    feedback_fn("* Verifying remaining instances")
1109
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1110
                                         feedback_fn)
1111
    bad = bad or result
1112

    
1113
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1114
      feedback_fn("* Verifying N+1 Memory redundancy")
1115
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1116
      bad = bad or result
1117

    
1118
    feedback_fn("* Other Notes")
1119
    if i_non_redundant:
1120
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1121
                  % len(i_non_redundant))
1122

    
1123
    if i_non_a_balanced:
1124
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1125
                  % len(i_non_a_balanced))
1126

    
1127
    if n_offline:
1128
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1129

    
1130
    if n_drained:
1131
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1132

    
1133
    return not bad
1134

    
1135
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1136
    """Analize the post-hooks' result
1137

1138
    This method analyses the hook result, handles it, and sends some
1139
    nicely-formatted feedback back to the user.
1140

1141
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1142
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1143
    @param hooks_results: the results of the multi-node hooks rpc call
1144
    @param feedback_fn: function used send feedback back to the caller
1145
    @param lu_result: previous Exec result
1146
    @return: the new Exec result, based on the previous result
1147
        and hook results
1148

1149
    """
1150
    # We only really run POST phase hooks, and are only interested in
1151
    # their results
1152
    if phase == constants.HOOKS_PHASE_POST:
1153
      # Used to change hooks' output to proper indentation
1154
      indent_re = re.compile('^', re.M)
1155
      feedback_fn("* Hooks Results")
1156
      if not hooks_results:
1157
        feedback_fn("  - ERROR: general communication failure")
1158
        lu_result = 1
1159
      else:
1160
        for node_name in hooks_results:
1161
          show_node_header = True
1162
          res = hooks_results[node_name]
1163
          if res.failed or res.data is False or not isinstance(res.data, list):
1164
            if res.offline:
1165
              # no need to warn or set fail return value
1166
              continue
1167
            feedback_fn("    Communication failure in hooks execution")
1168
            lu_result = 1
1169
            continue
1170
          for script, hkr, output in res.data:
1171
            if hkr == constants.HKR_FAIL:
1172
              # The node header is only shown once, if there are
1173
              # failing hooks on that node
1174
              if show_node_header:
1175
                feedback_fn("  Node %s:" % node_name)
1176
                show_node_header = False
1177
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1178
              output = indent_re.sub('      ', output)
1179
              feedback_fn("%s" % output)
1180
              lu_result = 1
1181

    
1182
      return lu_result
1183

    
1184

    
1185
class LUVerifyDisks(NoHooksLU):
1186
  """Verifies the cluster disks status.
1187

1188
  """
1189
  _OP_REQP = []
1190
  REQ_BGL = False
1191

    
1192
  def ExpandNames(self):
1193
    self.needed_locks = {
1194
      locking.LEVEL_NODE: locking.ALL_SET,
1195
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1196
    }
1197
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1198

    
1199
  def CheckPrereq(self):
1200
    """Check prerequisites.
1201

1202
    This has no prerequisites.
1203

1204
    """
1205
    pass
1206

    
1207
  def Exec(self, feedback_fn):
1208
    """Verify integrity of cluster disks.
1209

1210
    """
1211
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1212

    
1213
    vg_name = self.cfg.GetVGName()
1214
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1215
    instances = [self.cfg.GetInstanceInfo(name)
1216
                 for name in self.cfg.GetInstanceList()]
1217

    
1218
    nv_dict = {}
1219
    for inst in instances:
1220
      inst_lvs = {}
1221
      if (not inst.admin_up or
1222
          inst.disk_template not in constants.DTS_NET_MIRROR):
1223
        continue
1224
      inst.MapLVsByNode(inst_lvs)
1225
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1226
      for node, vol_list in inst_lvs.iteritems():
1227
        for vol in vol_list:
1228
          nv_dict[(node, vol)] = inst
1229

    
1230
    if not nv_dict:
1231
      return result
1232

    
1233
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1234

    
1235
    to_act = set()
1236
    for node in nodes:
1237
      # node_volume
1238
      lvs = node_lvs[node]
1239
      if lvs.failed:
1240
        if not lvs.offline:
1241
          self.LogWarning("Connection to node %s failed: %s" %
1242
                          (node, lvs.data))
1243
        continue
1244
      lvs = lvs.data
1245
      if isinstance(lvs, basestring):
1246
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1247
        res_nlvm[node] = lvs
1248
      elif not isinstance(lvs, dict):
1249
        logging.warning("Connection to node %s failed or invalid data"
1250
                        " returned", node)
1251
        res_nodes.append(node)
1252
        continue
1253

    
1254
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1255
        inst = nv_dict.pop((node, lv_name), None)
1256
        if (not lv_online and inst is not None
1257
            and inst.name not in res_instances):
1258
          res_instances.append(inst.name)
1259

    
1260
    # any leftover items in nv_dict are missing LVs, let's arrange the
1261
    # data better
1262
    for key, inst in nv_dict.iteritems():
1263
      if inst.name not in res_missing:
1264
        res_missing[inst.name] = []
1265
      res_missing[inst.name].append(key)
1266

    
1267
    return result
1268

    
1269

    
1270
class LURenameCluster(LogicalUnit):
1271
  """Rename the cluster.
1272

1273
  """
1274
  HPATH = "cluster-rename"
1275
  HTYPE = constants.HTYPE_CLUSTER
1276
  _OP_REQP = ["name"]
1277

    
1278
  def BuildHooksEnv(self):
1279
    """Build hooks env.
1280

1281
    """
1282
    env = {
1283
      "OP_TARGET": self.cfg.GetClusterName(),
1284
      "NEW_NAME": self.op.name,
1285
      }
1286
    mn = self.cfg.GetMasterNode()
1287
    return env, [mn], [mn]
1288

    
1289
  def CheckPrereq(self):
1290
    """Verify that the passed name is a valid one.
1291

1292
    """
1293
    hostname = utils.HostInfo(self.op.name)
1294

    
1295
    new_name = hostname.name
1296
    self.ip = new_ip = hostname.ip
1297
    old_name = self.cfg.GetClusterName()
1298
    old_ip = self.cfg.GetMasterIP()
1299
    if new_name == old_name and new_ip == old_ip:
1300
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1301
                                 " cluster has changed")
1302
    if new_ip != old_ip:
1303
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1304
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1305
                                   " reachable on the network. Aborting." %
1306
                                   new_ip)
1307

    
1308
    self.op.name = new_name
1309

    
1310
  def Exec(self, feedback_fn):
1311
    """Rename the cluster.
1312

1313
    """
1314
    clustername = self.op.name
1315
    ip = self.ip
1316

    
1317
    # shutdown the master IP
1318
    master = self.cfg.GetMasterNode()
1319
    result = self.rpc.call_node_stop_master(master, False)
1320
    if result.failed or not result.data:
1321
      raise errors.OpExecError("Could not disable the master role")
1322

    
1323
    try:
1324
      cluster = self.cfg.GetClusterInfo()
1325
      cluster.cluster_name = clustername
1326
      cluster.master_ip = ip
1327
      self.cfg.Update(cluster)
1328

    
1329
      # update the known hosts file
1330
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1331
      node_list = self.cfg.GetNodeList()
1332
      try:
1333
        node_list.remove(master)
1334
      except ValueError:
1335
        pass
1336
      result = self.rpc.call_upload_file(node_list,
1337
                                         constants.SSH_KNOWN_HOSTS_FILE)
1338
      for to_node, to_result in result.iteritems():
1339
        if to_result.failed or not to_result.data:
1340
          logging.error("Copy of file %s to node %s failed",
1341
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1342

    
1343
    finally:
1344
      result = self.rpc.call_node_start_master(master, False)
1345
      if result.failed or not result.data:
1346
        self.LogWarning("Could not re-enable the master role on"
1347
                        " the master, please restart manually.")
1348

    
1349

    
1350
def _RecursiveCheckIfLVMBased(disk):
1351
  """Check if the given disk or its children are lvm-based.
1352

1353
  @type disk: L{objects.Disk}
1354
  @param disk: the disk to check
1355
  @rtype: booleean
1356
  @return: boolean indicating whether a LD_LV dev_type was found or not
1357

1358
  """
1359
  if disk.children:
1360
    for chdisk in disk.children:
1361
      if _RecursiveCheckIfLVMBased(chdisk):
1362
        return True
1363
  return disk.dev_type == constants.LD_LV
1364

    
1365

    
1366
class LUSetClusterParams(LogicalUnit):
1367
  """Change the parameters of the cluster.
1368

1369
  """
1370
  HPATH = "cluster-modify"
1371
  HTYPE = constants.HTYPE_CLUSTER
1372
  _OP_REQP = []
1373
  REQ_BGL = False
1374

    
1375
  def CheckParameters(self):
1376
    """Check parameters
1377

1378
    """
1379
    if not hasattr(self.op, "candidate_pool_size"):
1380
      self.op.candidate_pool_size = None
1381
    if self.op.candidate_pool_size is not None:
1382
      try:
1383
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1384
      except ValueError, err:
1385
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1386
                                   str(err))
1387
      if self.op.candidate_pool_size < 1:
1388
        raise errors.OpPrereqError("At least one master candidate needed")
1389

    
1390
  def ExpandNames(self):
1391
    # FIXME: in the future maybe other cluster params won't require checking on
1392
    # all nodes to be modified.
1393
    self.needed_locks = {
1394
      locking.LEVEL_NODE: locking.ALL_SET,
1395
    }
1396
    self.share_locks[locking.LEVEL_NODE] = 1
1397

    
1398
  def BuildHooksEnv(self):
1399
    """Build hooks env.
1400

1401
    """
1402
    env = {
1403
      "OP_TARGET": self.cfg.GetClusterName(),
1404
      "NEW_VG_NAME": self.op.vg_name,
1405
      }
1406
    mn = self.cfg.GetMasterNode()
1407
    return env, [mn], [mn]
1408

    
1409
  def CheckPrereq(self):
1410
    """Check prerequisites.
1411

1412
    This checks whether the given params don't conflict and
1413
    if the given volume group is valid.
1414

1415
    """
1416
    if self.op.vg_name is not None and not self.op.vg_name:
1417
      instances = self.cfg.GetAllInstancesInfo().values()
1418
      for inst in instances:
1419
        for disk in inst.disks:
1420
          if _RecursiveCheckIfLVMBased(disk):
1421
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1422
                                       " lvm-based instances exist")
1423

    
1424
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1425

    
1426
    # if vg_name not None, checks given volume group on all nodes
1427
    if self.op.vg_name:
1428
      vglist = self.rpc.call_vg_list(node_list)
1429
      for node in node_list:
1430
        if vglist[node].failed:
1431
          # ignoring down node
1432
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1433
          continue
1434
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1435
                                              self.op.vg_name,
1436
                                              constants.MIN_VG_SIZE)
1437
        if vgstatus:
1438
          raise errors.OpPrereqError("Error on node '%s': %s" %
1439
                                     (node, vgstatus))
1440

    
1441
    self.cluster = cluster = self.cfg.GetClusterInfo()
1442
    # validate beparams changes
1443
    if self.op.beparams:
1444
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1445
      self.new_beparams = cluster.FillDict(
1446
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1447

    
1448
    # hypervisor list/parameters
1449
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1450
    if self.op.hvparams:
1451
      if not isinstance(self.op.hvparams, dict):
1452
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1453
      for hv_name, hv_dict in self.op.hvparams.items():
1454
        if hv_name not in self.new_hvparams:
1455
          self.new_hvparams[hv_name] = hv_dict
1456
        else:
1457
          self.new_hvparams[hv_name].update(hv_dict)
1458

    
1459
    if self.op.enabled_hypervisors is not None:
1460
      self.hv_list = self.op.enabled_hypervisors
1461
    else:
1462
      self.hv_list = cluster.enabled_hypervisors
1463

    
1464
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1465
      # either the enabled list has changed, or the parameters have, validate
1466
      for hv_name, hv_params in self.new_hvparams.items():
1467
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1468
            (self.op.enabled_hypervisors and
1469
             hv_name in self.op.enabled_hypervisors)):
1470
          # either this is a new hypervisor, or its parameters have changed
1471
          hv_class = hypervisor.GetHypervisor(hv_name)
1472
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1473
          hv_class.CheckParameterSyntax(hv_params)
1474
          _CheckHVParams(self, node_list, hv_name, hv_params)
1475

    
1476
  def Exec(self, feedback_fn):
1477
    """Change the parameters of the cluster.
1478

1479
    """
1480
    if self.op.vg_name is not None:
1481
      if self.op.vg_name != self.cfg.GetVGName():
1482
        self.cfg.SetVGName(self.op.vg_name)
1483
      else:
1484
        feedback_fn("Cluster LVM configuration already in desired"
1485
                    " state, not changing")
1486
    if self.op.hvparams:
1487
      self.cluster.hvparams = self.new_hvparams
1488
    if self.op.enabled_hypervisors is not None:
1489
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1490
    if self.op.beparams:
1491
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1492
    if self.op.candidate_pool_size is not None:
1493
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1494

    
1495
    self.cfg.Update(self.cluster)
1496

    
1497
    # we want to update nodes after the cluster so that if any errors
1498
    # happen, we have recorded and saved the cluster info
1499
    if self.op.candidate_pool_size is not None:
1500
      _AdjustCandidatePool(self)
1501

    
1502

    
1503
class LURedistributeConfig(NoHooksLU):
1504
  """Force the redistribution of cluster configuration.
1505

1506
  This is a very simple LU.
1507

1508
  """
1509
  _OP_REQP = []
1510
  REQ_BGL = False
1511

    
1512
  def ExpandNames(self):
1513
    self.needed_locks = {
1514
      locking.LEVEL_NODE: locking.ALL_SET,
1515
    }
1516
    self.share_locks[locking.LEVEL_NODE] = 1
1517

    
1518
  def CheckPrereq(self):
1519
    """Check prerequisites.
1520

1521
    """
1522

    
1523
  def Exec(self, feedback_fn):
1524
    """Redistribute the configuration.
1525

1526
    """
1527
    self.cfg.Update(self.cfg.GetClusterInfo())
1528

    
1529

    
1530
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1531
  """Sleep and poll for an instance's disk to sync.
1532

1533
  """
1534
  if not instance.disks:
1535
    return True
1536

    
1537
  if not oneshot:
1538
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1539

    
1540
  node = instance.primary_node
1541

    
1542
  for dev in instance.disks:
1543
    lu.cfg.SetDiskID(dev, node)
1544

    
1545
  retries = 0
1546
  while True:
1547
    max_time = 0
1548
    done = True
1549
    cumul_degraded = False
1550
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1551
    if rstats.failed or not rstats.data:
1552
      lu.LogWarning("Can't get any data from node %s", node)
1553
      retries += 1
1554
      if retries >= 10:
1555
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1556
                                 " aborting." % node)
1557
      time.sleep(6)
1558
      continue
1559
    rstats = rstats.data
1560
    retries = 0
1561
    for i, mstat in enumerate(rstats):
1562
      if mstat is None:
1563
        lu.LogWarning("Can't compute data for node %s/%s",
1564
                           node, instance.disks[i].iv_name)
1565
        continue
1566
      # we ignore the ldisk parameter
1567
      perc_done, est_time, is_degraded, _ = mstat
1568
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1569
      if perc_done is not None:
1570
        done = False
1571
        if est_time is not None:
1572
          rem_time = "%d estimated seconds remaining" % est_time
1573
          max_time = est_time
1574
        else:
1575
          rem_time = "no time estimate"
1576
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1577
                        (instance.disks[i].iv_name, perc_done, rem_time))
1578
    if done or oneshot:
1579
      break
1580

    
1581
    time.sleep(min(60, max_time))
1582

    
1583
  if done:
1584
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1585
  return not cumul_degraded
1586

    
1587

    
1588
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1589
  """Check that mirrors are not degraded.
1590

1591
  The ldisk parameter, if True, will change the test from the
1592
  is_degraded attribute (which represents overall non-ok status for
1593
  the device(s)) to the ldisk (representing the local storage status).
1594

1595
  """
1596
  lu.cfg.SetDiskID(dev, node)
1597
  if ldisk:
1598
    idx = 6
1599
  else:
1600
    idx = 5
1601

    
1602
  result = True
1603
  if on_primary or dev.AssembleOnSecondary():
1604
    rstats = lu.rpc.call_blockdev_find(node, dev)
1605
    msg = rstats.RemoteFailMsg()
1606
    if msg:
1607
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1608
      result = False
1609
    elif not rstats.payload:
1610
      lu.LogWarning("Can't find disk on node %s", node)
1611
      result = False
1612
    else:
1613
      result = result and (not rstats.payload[idx])
1614
  if dev.children:
1615
    for child in dev.children:
1616
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1617

    
1618
  return result
1619

    
1620

    
1621
class LUDiagnoseOS(NoHooksLU):
1622
  """Logical unit for OS diagnose/query.
1623

1624
  """
1625
  _OP_REQP = ["output_fields", "names"]
1626
  REQ_BGL = False
1627
  _FIELDS_STATIC = utils.FieldSet()
1628
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1629

    
1630
  def ExpandNames(self):
1631
    if self.op.names:
1632
      raise errors.OpPrereqError("Selective OS query not supported")
1633

    
1634
    _CheckOutputFields(static=self._FIELDS_STATIC,
1635
                       dynamic=self._FIELDS_DYNAMIC,
1636
                       selected=self.op.output_fields)
1637

    
1638
    # Lock all nodes, in shared mode
1639
    self.needed_locks = {}
1640
    self.share_locks[locking.LEVEL_NODE] = 1
1641
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1642

    
1643
  def CheckPrereq(self):
1644
    """Check prerequisites.
1645

1646
    """
1647

    
1648
  @staticmethod
1649
  def _DiagnoseByOS(node_list, rlist):
1650
    """Remaps a per-node return list into an a per-os per-node dictionary
1651

1652
    @param node_list: a list with the names of all nodes
1653
    @param rlist: a map with node names as keys and OS objects as values
1654

1655
    @rtype: dict
1656
    @returns: a dictionary with osnames as keys and as value another map, with
1657
        nodes as keys and list of OS objects as values, eg::
1658

1659
          {"debian-etch": {"node1": [<object>,...],
1660
                           "node2": [<object>,]}
1661
          }
1662

1663
    """
1664
    all_os = {}
1665
    for node_name, nr in rlist.iteritems():
1666
      if nr.failed or not nr.data:
1667
        continue
1668
      for os_obj in nr.data:
1669
        if os_obj.name not in all_os:
1670
          # build a list of nodes for this os containing empty lists
1671
          # for each node in node_list
1672
          all_os[os_obj.name] = {}
1673
          for nname in node_list:
1674
            all_os[os_obj.name][nname] = []
1675
        all_os[os_obj.name][node_name].append(os_obj)
1676
    return all_os
1677

    
1678
  def Exec(self, feedback_fn):
1679
    """Compute the list of OSes.
1680

1681
    """
1682
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1683
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1684
                   if node in node_list]
1685
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1686
    if node_data == False:
1687
      raise errors.OpExecError("Can't gather the list of OSes")
1688
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1689
    output = []
1690
    for os_name, os_data in pol.iteritems():
1691
      row = []
1692
      for field in self.op.output_fields:
1693
        if field == "name":
1694
          val = os_name
1695
        elif field == "valid":
1696
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1697
        elif field == "node_status":
1698
          val = {}
1699
          for node_name, nos_list in os_data.iteritems():
1700
            val[node_name] = [(v.status, v.path) for v in nos_list]
1701
        else:
1702
          raise errors.ParameterError(field)
1703
        row.append(val)
1704
      output.append(row)
1705

    
1706
    return output
1707

    
1708

    
1709
class LURemoveNode(LogicalUnit):
1710
  """Logical unit for removing a node.
1711

1712
  """
1713
  HPATH = "node-remove"
1714
  HTYPE = constants.HTYPE_NODE
1715
  _OP_REQP = ["node_name"]
1716

    
1717
  def BuildHooksEnv(self):
1718
    """Build hooks env.
1719

1720
    This doesn't run on the target node in the pre phase as a failed
1721
    node would then be impossible to remove.
1722

1723
    """
1724
    env = {
1725
      "OP_TARGET": self.op.node_name,
1726
      "NODE_NAME": self.op.node_name,
1727
      }
1728
    all_nodes = self.cfg.GetNodeList()
1729
    all_nodes.remove(self.op.node_name)
1730
    return env, all_nodes, all_nodes
1731

    
1732
  def CheckPrereq(self):
1733
    """Check prerequisites.
1734

1735
    This checks:
1736
     - the node exists in the configuration
1737
     - it does not have primary or secondary instances
1738
     - it's not the master
1739

1740
    Any errors are signalled by raising errors.OpPrereqError.
1741

1742
    """
1743
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1744
    if node is None:
1745
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1746

    
1747
    instance_list = self.cfg.GetInstanceList()
1748

    
1749
    masternode = self.cfg.GetMasterNode()
1750
    if node.name == masternode:
1751
      raise errors.OpPrereqError("Node is the master node,"
1752
                                 " you need to failover first.")
1753

    
1754
    for instance_name in instance_list:
1755
      instance = self.cfg.GetInstanceInfo(instance_name)
1756
      if node.name in instance.all_nodes:
1757
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1758
                                   " please remove first." % instance_name)
1759
    self.op.node_name = node.name
1760
    self.node = node
1761

    
1762
  def Exec(self, feedback_fn):
1763
    """Removes the node from the cluster.
1764

1765
    """
1766
    node = self.node
1767
    logging.info("Stopping the node daemon and removing configs from node %s",
1768
                 node.name)
1769

    
1770
    self.context.RemoveNode(node.name)
1771

    
1772
    self.rpc.call_node_leave_cluster(node.name)
1773

    
1774
    # Promote nodes to master candidate as needed
1775
    _AdjustCandidatePool(self)
1776

    
1777

    
1778
class LUQueryNodes(NoHooksLU):
1779
  """Logical unit for querying nodes.
1780

1781
  """
1782
  _OP_REQP = ["output_fields", "names", "use_locking"]
1783
  REQ_BGL = False
1784
  _FIELDS_DYNAMIC = utils.FieldSet(
1785
    "dtotal", "dfree",
1786
    "mtotal", "mnode", "mfree",
1787
    "bootid",
1788
    "ctotal", "cnodes", "csockets",
1789
    )
1790

    
1791
  _FIELDS_STATIC = utils.FieldSet(
1792
    "name", "pinst_cnt", "sinst_cnt",
1793
    "pinst_list", "sinst_list",
1794
    "pip", "sip", "tags",
1795
    "serial_no",
1796
    "master_candidate",
1797
    "master",
1798
    "offline",
1799
    "drained",
1800
    )
1801

    
1802
  def ExpandNames(self):
1803
    _CheckOutputFields(static=self._FIELDS_STATIC,
1804
                       dynamic=self._FIELDS_DYNAMIC,
1805
                       selected=self.op.output_fields)
1806

    
1807
    self.needed_locks = {}
1808
    self.share_locks[locking.LEVEL_NODE] = 1
1809

    
1810
    if self.op.names:
1811
      self.wanted = _GetWantedNodes(self, self.op.names)
1812
    else:
1813
      self.wanted = locking.ALL_SET
1814

    
1815
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1816
    self.do_locking = self.do_node_query and self.op.use_locking
1817
    if self.do_locking:
1818
      # if we don't request only static fields, we need to lock the nodes
1819
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1820

    
1821

    
1822
  def CheckPrereq(self):
1823
    """Check prerequisites.
1824

1825
    """
1826
    # The validation of the node list is done in the _GetWantedNodes,
1827
    # if non empty, and if empty, there's no validation to do
1828
    pass
1829

    
1830
  def Exec(self, feedback_fn):
1831
    """Computes the list of nodes and their attributes.
1832

1833
    """
1834
    all_info = self.cfg.GetAllNodesInfo()
1835
    if self.do_locking:
1836
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1837
    elif self.wanted != locking.ALL_SET:
1838
      nodenames = self.wanted
1839
      missing = set(nodenames).difference(all_info.keys())
1840
      if missing:
1841
        raise errors.OpExecError(
1842
          "Some nodes were removed before retrieving their data: %s" % missing)
1843
    else:
1844
      nodenames = all_info.keys()
1845

    
1846
    nodenames = utils.NiceSort(nodenames)
1847
    nodelist = [all_info[name] for name in nodenames]
1848

    
1849
    # begin data gathering
1850

    
1851
    if self.do_node_query:
1852
      live_data = {}
1853
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1854
                                          self.cfg.GetHypervisorType())
1855
      for name in nodenames:
1856
        nodeinfo = node_data[name]
1857
        if not nodeinfo.failed and nodeinfo.data:
1858
          nodeinfo = nodeinfo.data
1859
          fn = utils.TryConvert
1860
          live_data[name] = {
1861
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1862
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1863
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1864
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1865
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1866
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1867
            "bootid": nodeinfo.get('bootid', None),
1868
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1869
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1870
            }
1871
        else:
1872
          live_data[name] = {}
1873
    else:
1874
      live_data = dict.fromkeys(nodenames, {})
1875

    
1876
    node_to_primary = dict([(name, set()) for name in nodenames])
1877
    node_to_secondary = dict([(name, set()) for name in nodenames])
1878

    
1879
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1880
                             "sinst_cnt", "sinst_list"))
1881
    if inst_fields & frozenset(self.op.output_fields):
1882
      instancelist = self.cfg.GetInstanceList()
1883

    
1884
      for instance_name in instancelist:
1885
        inst = self.cfg.GetInstanceInfo(instance_name)
1886
        if inst.primary_node in node_to_primary:
1887
          node_to_primary[inst.primary_node].add(inst.name)
1888
        for secnode in inst.secondary_nodes:
1889
          if secnode in node_to_secondary:
1890
            node_to_secondary[secnode].add(inst.name)
1891

    
1892
    master_node = self.cfg.GetMasterNode()
1893

    
1894
    # end data gathering
1895

    
1896
    output = []
1897
    for node in nodelist:
1898
      node_output = []
1899
      for field in self.op.output_fields:
1900
        if field == "name":
1901
          val = node.name
1902
        elif field == "pinst_list":
1903
          val = list(node_to_primary[node.name])
1904
        elif field == "sinst_list":
1905
          val = list(node_to_secondary[node.name])
1906
        elif field == "pinst_cnt":
1907
          val = len(node_to_primary[node.name])
1908
        elif field == "sinst_cnt":
1909
          val = len(node_to_secondary[node.name])
1910
        elif field == "pip":
1911
          val = node.primary_ip
1912
        elif field == "sip":
1913
          val = node.secondary_ip
1914
        elif field == "tags":
1915
          val = list(node.GetTags())
1916
        elif field == "serial_no":
1917
          val = node.serial_no
1918
        elif field == "master_candidate":
1919
          val = node.master_candidate
1920
        elif field == "master":
1921
          val = node.name == master_node
1922
        elif field == "offline":
1923
          val = node.offline
1924
        elif field == "drained":
1925
          val = node.drained
1926
        elif self._FIELDS_DYNAMIC.Matches(field):
1927
          val = live_data[node.name].get(field, None)
1928
        else:
1929
          raise errors.ParameterError(field)
1930
        node_output.append(val)
1931
      output.append(node_output)
1932

    
1933
    return output
1934

    
1935

    
1936
class LUQueryNodeVolumes(NoHooksLU):
1937
  """Logical unit for getting volumes on node(s).
1938

1939
  """
1940
  _OP_REQP = ["nodes", "output_fields"]
1941
  REQ_BGL = False
1942
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1943
  _FIELDS_STATIC = utils.FieldSet("node")
1944

    
1945
  def ExpandNames(self):
1946
    _CheckOutputFields(static=self._FIELDS_STATIC,
1947
                       dynamic=self._FIELDS_DYNAMIC,
1948
                       selected=self.op.output_fields)
1949

    
1950
    self.needed_locks = {}
1951
    self.share_locks[locking.LEVEL_NODE] = 1
1952
    if not self.op.nodes:
1953
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1954
    else:
1955
      self.needed_locks[locking.LEVEL_NODE] = \
1956
        _GetWantedNodes(self, self.op.nodes)
1957

    
1958
  def CheckPrereq(self):
1959
    """Check prerequisites.
1960

1961
    This checks that the fields required are valid output fields.
1962

1963
    """
1964
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1965

    
1966
  def Exec(self, feedback_fn):
1967
    """Computes the list of nodes and their attributes.
1968

1969
    """
1970
    nodenames = self.nodes
1971
    volumes = self.rpc.call_node_volumes(nodenames)
1972

    
1973
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1974
             in self.cfg.GetInstanceList()]
1975

    
1976
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1977

    
1978
    output = []
1979
    for node in nodenames:
1980
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1981
        continue
1982

    
1983
      node_vols = volumes[node].data[:]
1984
      node_vols.sort(key=lambda vol: vol['dev'])
1985

    
1986
      for vol in node_vols:
1987
        node_output = []
1988
        for field in self.op.output_fields:
1989
          if field == "node":
1990
            val = node
1991
          elif field == "phys":
1992
            val = vol['dev']
1993
          elif field == "vg":
1994
            val = vol['vg']
1995
          elif field == "name":
1996
            val = vol['name']
1997
          elif field == "size":
1998
            val = int(float(vol['size']))
1999
          elif field == "instance":
2000
            for inst in ilist:
2001
              if node not in lv_by_node[inst]:
2002
                continue
2003
              if vol['name'] in lv_by_node[inst][node]:
2004
                val = inst.name
2005
                break
2006
            else:
2007
              val = '-'
2008
          else:
2009
            raise errors.ParameterError(field)
2010
          node_output.append(str(val))
2011

    
2012
        output.append(node_output)
2013

    
2014
    return output
2015

    
2016

    
2017
class LUAddNode(LogicalUnit):
2018
  """Logical unit for adding node to the cluster.
2019

2020
  """
2021
  HPATH = "node-add"
2022
  HTYPE = constants.HTYPE_NODE
2023
  _OP_REQP = ["node_name"]
2024

    
2025
  def BuildHooksEnv(self):
2026
    """Build hooks env.
2027

2028
    This will run on all nodes before, and on all nodes + the new node after.
2029

2030
    """
2031
    env = {
2032
      "OP_TARGET": self.op.node_name,
2033
      "NODE_NAME": self.op.node_name,
2034
      "NODE_PIP": self.op.primary_ip,
2035
      "NODE_SIP": self.op.secondary_ip,
2036
      }
2037
    nodes_0 = self.cfg.GetNodeList()
2038
    nodes_1 = nodes_0 + [self.op.node_name, ]
2039
    return env, nodes_0, nodes_1
2040

    
2041
  def CheckPrereq(self):
2042
    """Check prerequisites.
2043

2044
    This checks:
2045
     - the new node is not already in the config
2046
     - it is resolvable
2047
     - its parameters (single/dual homed) matches the cluster
2048

2049
    Any errors are signalled by raising errors.OpPrereqError.
2050

2051
    """
2052
    node_name = self.op.node_name
2053
    cfg = self.cfg
2054

    
2055
    dns_data = utils.HostInfo(node_name)
2056

    
2057
    node = dns_data.name
2058
    primary_ip = self.op.primary_ip = dns_data.ip
2059
    secondary_ip = getattr(self.op, "secondary_ip", None)
2060
    if secondary_ip is None:
2061
      secondary_ip = primary_ip
2062
    if not utils.IsValidIP(secondary_ip):
2063
      raise errors.OpPrereqError("Invalid secondary IP given")
2064
    self.op.secondary_ip = secondary_ip
2065

    
2066
    node_list = cfg.GetNodeList()
2067
    if not self.op.readd and node in node_list:
2068
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2069
                                 node)
2070
    elif self.op.readd and node not in node_list:
2071
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2072

    
2073
    for existing_node_name in node_list:
2074
      existing_node = cfg.GetNodeInfo(existing_node_name)
2075

    
2076
      if self.op.readd and node == existing_node_name:
2077
        if (existing_node.primary_ip != primary_ip or
2078
            existing_node.secondary_ip != secondary_ip):
2079
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2080
                                     " address configuration as before")
2081
        continue
2082

    
2083
      if (existing_node.primary_ip == primary_ip or
2084
          existing_node.secondary_ip == primary_ip or
2085
          existing_node.primary_ip == secondary_ip or
2086
          existing_node.secondary_ip == secondary_ip):
2087
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2088
                                   " existing node %s" % existing_node.name)
2089

    
2090
    # check that the type of the node (single versus dual homed) is the
2091
    # same as for the master
2092
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2093
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2094
    newbie_singlehomed = secondary_ip == primary_ip
2095
    if master_singlehomed != newbie_singlehomed:
2096
      if master_singlehomed:
2097
        raise errors.OpPrereqError("The master has no private ip but the"
2098
                                   " new node has one")
2099
      else:
2100
        raise errors.OpPrereqError("The master has a private ip but the"
2101
                                   " new node doesn't have one")
2102

    
2103
    # checks reachablity
2104
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2105
      raise errors.OpPrereqError("Node not reachable by ping")
2106

    
2107
    if not newbie_singlehomed:
2108
      # check reachability from my secondary ip to newbie's secondary ip
2109
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2110
                           source=myself.secondary_ip):
2111
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2112
                                   " based ping to noded port")
2113

    
2114
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2115
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2116
    master_candidate = mc_now < cp_size
2117

    
2118
    self.new_node = objects.Node(name=node,
2119
                                 primary_ip=primary_ip,
2120
                                 secondary_ip=secondary_ip,
2121
                                 master_candidate=master_candidate,
2122
                                 offline=False, drained=False)
2123

    
2124
  def Exec(self, feedback_fn):
2125
    """Adds the new node to the cluster.
2126

2127
    """
2128
    new_node = self.new_node
2129
    node = new_node.name
2130

    
2131
    # check connectivity
2132
    result = self.rpc.call_version([node])[node]
2133
    result.Raise()
2134
    if result.data:
2135
      if constants.PROTOCOL_VERSION == result.data:
2136
        logging.info("Communication to node %s fine, sw version %s match",
2137
                     node, result.data)
2138
      else:
2139
        raise errors.OpExecError("Version mismatch master version %s,"
2140
                                 " node version %s" %
2141
                                 (constants.PROTOCOL_VERSION, result.data))
2142
    else:
2143
      raise errors.OpExecError("Cannot get version from the new node")
2144

    
2145
    # setup ssh on node
2146
    logging.info("Copy ssh key to node %s", node)
2147
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2148
    keyarray = []
2149
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2150
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2151
                priv_key, pub_key]
2152

    
2153
    for i in keyfiles:
2154
      f = open(i, 'r')
2155
      try:
2156
        keyarray.append(f.read())
2157
      finally:
2158
        f.close()
2159

    
2160
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2161
                                    keyarray[2],
2162
                                    keyarray[3], keyarray[4], keyarray[5])
2163

    
2164
    msg = result.RemoteFailMsg()
2165
    if msg:
2166
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2167
                               " new node: %s" % msg)
2168

    
2169
    # Add node to our /etc/hosts, and add key to known_hosts
2170
    utils.AddHostToEtcHosts(new_node.name)
2171

    
2172
    if new_node.secondary_ip != new_node.primary_ip:
2173
      result = self.rpc.call_node_has_ip_address(new_node.name,
2174
                                                 new_node.secondary_ip)
2175
      if result.failed or not result.data:
2176
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2177
                                 " you gave (%s). Please fix and re-run this"
2178
                                 " command." % new_node.secondary_ip)
2179

    
2180
    node_verify_list = [self.cfg.GetMasterNode()]
2181
    node_verify_param = {
2182
      'nodelist': [node],
2183
      # TODO: do a node-net-test as well?
2184
    }
2185

    
2186
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2187
                                       self.cfg.GetClusterName())
2188
    for verifier in node_verify_list:
2189
      if result[verifier].failed or not result[verifier].data:
2190
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2191
                                 " for remote verification" % verifier)
2192
      if result[verifier].data['nodelist']:
2193
        for failed in result[verifier].data['nodelist']:
2194
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2195
                      (verifier, result[verifier].data['nodelist'][failed]))
2196
        raise errors.OpExecError("ssh/hostname verification failed.")
2197

    
2198
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2199
    # including the node just added
2200
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2201
    dist_nodes = self.cfg.GetNodeList()
2202
    if not self.op.readd:
2203
      dist_nodes.append(node)
2204
    if myself.name in dist_nodes:
2205
      dist_nodes.remove(myself.name)
2206

    
2207
    logging.debug("Copying hosts and known_hosts to all nodes")
2208
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2209
      result = self.rpc.call_upload_file(dist_nodes, fname)
2210
      for to_node, to_result in result.iteritems():
2211
        if to_result.failed or not to_result.data:
2212
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2213

    
2214
    to_copy = []
2215
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2216
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2217
      to_copy.append(constants.VNC_PASSWORD_FILE)
2218

    
2219
    for fname in to_copy:
2220
      result = self.rpc.call_upload_file([node], fname)
2221
      if result[node].failed or not result[node]:
2222
        logging.error("Could not copy file %s to node %s", fname, node)
2223

    
2224
    if self.op.readd:
2225
      self.context.ReaddNode(new_node)
2226
    else:
2227
      self.context.AddNode(new_node)
2228

    
2229

    
2230
class LUSetNodeParams(LogicalUnit):
2231
  """Modifies the parameters of a node.
2232

2233
  """
2234
  HPATH = "node-modify"
2235
  HTYPE = constants.HTYPE_NODE
2236
  _OP_REQP = ["node_name"]
2237
  REQ_BGL = False
2238

    
2239
  def CheckArguments(self):
2240
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2241
    if node_name is None:
2242
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2243
    self.op.node_name = node_name
2244
    _CheckBooleanOpField(self.op, 'master_candidate')
2245
    _CheckBooleanOpField(self.op, 'offline')
2246
    _CheckBooleanOpField(self.op, 'drained')
2247
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2248
    if all_mods.count(None) == 3:
2249
      raise errors.OpPrereqError("Please pass at least one modification")
2250
    if all_mods.count(True) > 1:
2251
      raise errors.OpPrereqError("Can't set the node into more than one"
2252
                                 " state at the same time")
2253

    
2254
  def ExpandNames(self):
2255
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2256

    
2257
  def BuildHooksEnv(self):
2258
    """Build hooks env.
2259

2260
    This runs on the master node.
2261

2262
    """
2263
    env = {
2264
      "OP_TARGET": self.op.node_name,
2265
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2266
      "OFFLINE": str(self.op.offline),
2267
      "DRAINED": str(self.op.drained),
2268
      }
2269
    nl = [self.cfg.GetMasterNode(),
2270
          self.op.node_name]
2271
    return env, nl, nl
2272

    
2273
  def CheckPrereq(self):
2274
    """Check prerequisites.
2275

2276
    This only checks the instance list against the existing names.
2277

2278
    """
2279
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2280

    
2281
    if ((self.op.master_candidate == False or self.op.offline == True or
2282
         self.op.drained == True) and node.master_candidate):
2283
      # we will demote the node from master_candidate
2284
      if self.op.node_name == self.cfg.GetMasterNode():
2285
        raise errors.OpPrereqError("The master node has to be a"
2286
                                   " master candidate, online and not drained")
2287
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2288
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2289
      if num_candidates <= cp_size:
2290
        msg = ("Not enough master candidates (desired"
2291
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2292
        if self.op.force:
2293
          self.LogWarning(msg)
2294
        else:
2295
          raise errors.OpPrereqError(msg)
2296

    
2297
    if (self.op.master_candidate == True and
2298
        ((node.offline and not self.op.offline == False) or
2299
         (node.drained and not self.op.drained == False))):
2300
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2301
                                 " to master_candidate")
2302

    
2303
    return
2304

    
2305
  def Exec(self, feedback_fn):
2306
    """Modifies a node.
2307

2308
    """
2309
    node = self.node
2310

    
2311
    result = []
2312
    changed_mc = False
2313

    
2314
    if self.op.offline is not None:
2315
      node.offline = self.op.offline
2316
      result.append(("offline", str(self.op.offline)))
2317
      if self.op.offline == True:
2318
        if node.master_candidate:
2319
          node.master_candidate = False
2320
          changed_mc = True
2321
          result.append(("master_candidate", "auto-demotion due to offline"))
2322
        if node.drained:
2323
          node.drained = False
2324
          result.append(("drained", "clear drained status due to offline"))
2325

    
2326
    if self.op.master_candidate is not None:
2327
      node.master_candidate = self.op.master_candidate
2328
      changed_mc = True
2329
      result.append(("master_candidate", str(self.op.master_candidate)))
2330
      if self.op.master_candidate == False:
2331
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2332
        msg = rrc.RemoteFailMsg()
2333
        if msg:
2334
          self.LogWarning("Node failed to demote itself: %s" % msg)
2335

    
2336
    if self.op.drained is not None:
2337
      node.drained = self.op.drained
2338
      result.append(("drained", str(self.op.drained)))
2339
      if self.op.drained == True:
2340
        if node.master_candidate:
2341
          node.master_candidate = False
2342
          changed_mc = True
2343
          result.append(("master_candidate", "auto-demotion due to drain"))
2344
        if node.offline:
2345
          node.offline = False
2346
          result.append(("offline", "clear offline status due to drain"))
2347

    
2348
    # this will trigger configuration file update, if needed
2349
    self.cfg.Update(node)
2350
    # this will trigger job queue propagation or cleanup
2351
    if changed_mc:
2352
      self.context.ReaddNode(node)
2353

    
2354
    return result
2355

    
2356

    
2357
class LUQueryClusterInfo(NoHooksLU):
2358
  """Query cluster configuration.
2359

2360
  """
2361
  _OP_REQP = []
2362
  REQ_BGL = False
2363

    
2364
  def ExpandNames(self):
2365
    self.needed_locks = {}
2366

    
2367
  def CheckPrereq(self):
2368
    """No prerequsites needed for this LU.
2369

2370
    """
2371
    pass
2372

    
2373
  def Exec(self, feedback_fn):
2374
    """Return cluster config.
2375

2376
    """
2377
    cluster = self.cfg.GetClusterInfo()
2378
    result = {
2379
      "software_version": constants.RELEASE_VERSION,
2380
      "protocol_version": constants.PROTOCOL_VERSION,
2381
      "config_version": constants.CONFIG_VERSION,
2382
      "os_api_version": constants.OS_API_VERSION,
2383
      "export_version": constants.EXPORT_VERSION,
2384
      "architecture": (platform.architecture()[0], platform.machine()),
2385
      "name": cluster.cluster_name,
2386
      "master": cluster.master_node,
2387
      "default_hypervisor": cluster.default_hypervisor,
2388
      "enabled_hypervisors": cluster.enabled_hypervisors,
2389
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2390
                        for hypervisor in cluster.enabled_hypervisors]),
2391
      "beparams": cluster.beparams,
2392
      "candidate_pool_size": cluster.candidate_pool_size,
2393
      }
2394

    
2395
    return result
2396

    
2397

    
2398
class LUQueryConfigValues(NoHooksLU):
2399
  """Return configuration values.
2400

2401
  """
2402
  _OP_REQP = []
2403
  REQ_BGL = False
2404
  _FIELDS_DYNAMIC = utils.FieldSet()
2405
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2406

    
2407
  def ExpandNames(self):
2408
    self.needed_locks = {}
2409

    
2410
    _CheckOutputFields(static=self._FIELDS_STATIC,
2411
                       dynamic=self._FIELDS_DYNAMIC,
2412
                       selected=self.op.output_fields)
2413

    
2414
  def CheckPrereq(self):
2415
    """No prerequisites.
2416

2417
    """
2418
    pass
2419

    
2420
  def Exec(self, feedback_fn):
2421
    """Dump a representation of the cluster config to the standard output.
2422

2423
    """
2424
    values = []
2425
    for field in self.op.output_fields:
2426
      if field == "cluster_name":
2427
        entry = self.cfg.GetClusterName()
2428
      elif field == "master_node":
2429
        entry = self.cfg.GetMasterNode()
2430
      elif field == "drain_flag":
2431
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2432
      else:
2433
        raise errors.ParameterError(field)
2434
      values.append(entry)
2435
    return values
2436

    
2437

    
2438
class LUActivateInstanceDisks(NoHooksLU):
2439
  """Bring up an instance's disks.
2440

2441
  """
2442
  _OP_REQP = ["instance_name"]
2443
  REQ_BGL = False
2444

    
2445
  def ExpandNames(self):
2446
    self._ExpandAndLockInstance()
2447
    self.needed_locks[locking.LEVEL_NODE] = []
2448
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2449

    
2450
  def DeclareLocks(self, level):
2451
    if level == locking.LEVEL_NODE:
2452
      self._LockInstancesNodes()
2453

    
2454
  def CheckPrereq(self):
2455
    """Check prerequisites.
2456

2457
    This checks that the instance is in the cluster.
2458

2459
    """
2460
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2461
    assert self.instance is not None, \
2462
      "Cannot retrieve locked instance %s" % self.op.instance_name
2463
    _CheckNodeOnline(self, self.instance.primary_node)
2464

    
2465
  def Exec(self, feedback_fn):
2466
    """Activate the disks.
2467

2468
    """
2469
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2470
    if not disks_ok:
2471
      raise errors.OpExecError("Cannot activate block devices")
2472

    
2473
    return disks_info
2474

    
2475

    
2476
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2477
  """Prepare the block devices for an instance.
2478

2479
  This sets up the block devices on all nodes.
2480

2481
  @type lu: L{LogicalUnit}
2482
  @param lu: the logical unit on whose behalf we execute
2483
  @type instance: L{objects.Instance}
2484
  @param instance: the instance for whose disks we assemble
2485
  @type ignore_secondaries: boolean
2486
  @param ignore_secondaries: if true, errors on secondary nodes
2487
      won't result in an error return from the function
2488
  @return: False if the operation failed, otherwise a list of
2489
      (host, instance_visible_name, node_visible_name)
2490
      with the mapping from node devices to instance devices
2491

2492
  """
2493
  device_info = []
2494
  disks_ok = True
2495
  iname = instance.name
2496
  # With the two passes mechanism we try to reduce the window of
2497
  # opportunity for the race condition of switching DRBD to primary
2498
  # before handshaking occured, but we do not eliminate it
2499

    
2500
  # The proper fix would be to wait (with some limits) until the
2501
  # connection has been made and drbd transitions from WFConnection
2502
  # into any other network-connected state (Connected, SyncTarget,
2503
  # SyncSource, etc.)
2504

    
2505
  # 1st pass, assemble on all nodes in secondary mode
2506
  for inst_disk in instance.disks:
2507
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2508
      lu.cfg.SetDiskID(node_disk, node)
2509
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2510
      msg = result.RemoteFailMsg()
2511
      if msg:
2512
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2513
                           " (is_primary=False, pass=1): %s",
2514
                           inst_disk.iv_name, node, msg)
2515
        if not ignore_secondaries:
2516
          disks_ok = False
2517

    
2518
  # FIXME: race condition on drbd migration to primary
2519

    
2520
  # 2nd pass, do only the primary node
2521
  for inst_disk in instance.disks:
2522
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2523
      if node != instance.primary_node:
2524
        continue
2525
      lu.cfg.SetDiskID(node_disk, node)
2526
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2527
      msg = result.RemoteFailMsg()
2528
      if msg:
2529
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2530
                           " (is_primary=True, pass=2): %s",
2531
                           inst_disk.iv_name, node, msg)
2532
        disks_ok = False
2533
    device_info.append((instance.primary_node, inst_disk.iv_name,
2534
                        result.payload))
2535

    
2536
  # leave the disks configured for the primary node
2537
  # this is a workaround that would be fixed better by
2538
  # improving the logical/physical id handling
2539
  for disk in instance.disks:
2540
    lu.cfg.SetDiskID(disk, instance.primary_node)
2541

    
2542
  return disks_ok, device_info
2543

    
2544

    
2545
def _StartInstanceDisks(lu, instance, force):
2546
  """Start the disks of an instance.
2547

2548
  """
2549
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2550
                                           ignore_secondaries=force)
2551
  if not disks_ok:
2552
    _ShutdownInstanceDisks(lu, instance)
2553
    if force is not None and not force:
2554
      lu.proc.LogWarning("", hint="If the message above refers to a"
2555
                         " secondary node,"
2556
                         " you can retry the operation using '--force'.")
2557
    raise errors.OpExecError("Disk consistency error")
2558

    
2559

    
2560
class LUDeactivateInstanceDisks(NoHooksLU):
2561
  """Shutdown an instance's disks.
2562

2563
  """
2564
  _OP_REQP = ["instance_name"]
2565
  REQ_BGL = False
2566

    
2567
  def ExpandNames(self):
2568
    self._ExpandAndLockInstance()
2569
    self.needed_locks[locking.LEVEL_NODE] = []
2570
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2571

    
2572
  def DeclareLocks(self, level):
2573
    if level == locking.LEVEL_NODE:
2574
      self._LockInstancesNodes()
2575

    
2576
  def CheckPrereq(self):
2577
    """Check prerequisites.
2578

2579
    This checks that the instance is in the cluster.
2580

2581
    """
2582
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2583
    assert self.instance is not None, \
2584
      "Cannot retrieve locked instance %s" % self.op.instance_name
2585

    
2586
  def Exec(self, feedback_fn):
2587
    """Deactivate the disks
2588

2589
    """
2590
    instance = self.instance
2591
    _SafeShutdownInstanceDisks(self, instance)
2592

    
2593

    
2594
def _SafeShutdownInstanceDisks(lu, instance):
2595
  """Shutdown block devices of an instance.
2596

2597
  This function checks if an instance is running, before calling
2598
  _ShutdownInstanceDisks.
2599

2600
  """
2601
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2602
                                      [instance.hypervisor])
2603
  ins_l = ins_l[instance.primary_node]
2604
  if ins_l.failed or not isinstance(ins_l.data, list):
2605
    raise errors.OpExecError("Can't contact node '%s'" %
2606
                             instance.primary_node)
2607

    
2608
  if instance.name in ins_l.data:
2609
    raise errors.OpExecError("Instance is running, can't shutdown"
2610
                             " block devices.")
2611

    
2612
  _ShutdownInstanceDisks(lu, instance)
2613

    
2614

    
2615
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2616
  """Shutdown block devices of an instance.
2617

2618
  This does the shutdown on all nodes of the instance.
2619

2620
  If the ignore_primary is false, errors on the primary node are
2621
  ignored.
2622

2623
  """
2624
  all_result = True
2625
  for disk in instance.disks:
2626
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2627
      lu.cfg.SetDiskID(top_disk, node)
2628
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2629
      msg = result.RemoteFailMsg()
2630
      if msg:
2631
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2632
                      disk.iv_name, node, msg)
2633
        if not ignore_primary or node != instance.primary_node:
2634
          all_result = False
2635
  return all_result
2636

    
2637

    
2638
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2639
  """Checks if a node has enough free memory.
2640

2641
  This function check if a given node has the needed amount of free
2642
  memory. In case the node has less memory or we cannot get the
2643
  information from the node, this function raise an OpPrereqError
2644
  exception.
2645

2646
  @type lu: C{LogicalUnit}
2647
  @param lu: a logical unit from which we get configuration data
2648
  @type node: C{str}
2649
  @param node: the node to check
2650
  @type reason: C{str}
2651
  @param reason: string to use in the error message
2652
  @type requested: C{int}
2653
  @param requested: the amount of memory in MiB to check for
2654
  @type hypervisor_name: C{str}
2655
  @param hypervisor_name: the hypervisor to ask for memory stats
2656
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2657
      we cannot check the node
2658

2659
  """
2660
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2661
  nodeinfo[node].Raise()
2662
  free_mem = nodeinfo[node].data.get('memory_free')
2663
  if not isinstance(free_mem, int):
2664
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2665
                             " was '%s'" % (node, free_mem))
2666
  if requested > free_mem:
2667
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2668
                             " needed %s MiB, available %s MiB" %
2669
                             (node, reason, requested, free_mem))
2670

    
2671

    
2672
class LUStartupInstance(LogicalUnit):
2673
  """Starts an instance.
2674

2675
  """
2676
  HPATH = "instance-start"
2677
  HTYPE = constants.HTYPE_INSTANCE
2678
  _OP_REQP = ["instance_name", "force"]
2679
  REQ_BGL = False
2680

    
2681
  def ExpandNames(self):
2682
    self._ExpandAndLockInstance()
2683

    
2684
  def BuildHooksEnv(self):
2685
    """Build hooks env.
2686

2687
    This runs on master, primary and secondary nodes of the instance.
2688

2689
    """
2690
    env = {
2691
      "FORCE": self.op.force,
2692
      }
2693
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2694
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2695
    return env, nl, nl
2696

    
2697
  def CheckPrereq(self):
2698
    """Check prerequisites.
2699

2700
    This checks that the instance is in the cluster.
2701

2702
    """
2703
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2704
    assert self.instance is not None, \
2705
      "Cannot retrieve locked instance %s" % self.op.instance_name
2706

    
2707
    _CheckNodeOnline(self, instance.primary_node)
2708

    
2709
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2710
    # check bridges existance
2711
    _CheckInstanceBridgesExist(self, instance)
2712

    
2713
    _CheckNodeFreeMemory(self, instance.primary_node,
2714
                         "starting instance %s" % instance.name,
2715
                         bep[constants.BE_MEMORY], instance.hypervisor)
2716

    
2717
  def Exec(self, feedback_fn):
2718
    """Start the instance.
2719

2720
    """
2721
    instance = self.instance
2722
    force = self.op.force
2723
    extra_args = getattr(self.op, "extra_args", "")
2724

    
2725
    self.cfg.MarkInstanceUp(instance.name)
2726

    
2727
    node_current = instance.primary_node
2728

    
2729
    _StartInstanceDisks(self, instance, force)
2730

    
2731
    result = self.rpc.call_instance_start(node_current, instance, extra_args)
2732
    msg = result.RemoteFailMsg()
2733
    if msg:
2734
      _ShutdownInstanceDisks(self, instance)
2735
      raise errors.OpExecError("Could not start instance: %s" % msg)
2736

    
2737

    
2738
class LURebootInstance(LogicalUnit):
2739
  """Reboot an instance.
2740

2741
  """
2742
  HPATH = "instance-reboot"
2743
  HTYPE = constants.HTYPE_INSTANCE
2744
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2745
  REQ_BGL = False
2746

    
2747
  def ExpandNames(self):
2748
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2749
                                   constants.INSTANCE_REBOOT_HARD,
2750
                                   constants.INSTANCE_REBOOT_FULL]:
2751
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2752
                                  (constants.INSTANCE_REBOOT_SOFT,
2753
                                   constants.INSTANCE_REBOOT_HARD,
2754
                                   constants.INSTANCE_REBOOT_FULL))
2755
    self._ExpandAndLockInstance()
2756

    
2757
  def BuildHooksEnv(self):
2758
    """Build hooks env.
2759

2760
    This runs on master, primary and secondary nodes of the instance.
2761

2762
    """
2763
    env = {
2764
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2765
      }
2766
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2767
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2768
    return env, nl, nl
2769

    
2770
  def CheckPrereq(self):
2771
    """Check prerequisites.
2772

2773
    This checks that the instance is in the cluster.
2774

2775
    """
2776
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2777
    assert self.instance is not None, \
2778
      "Cannot retrieve locked instance %s" % self.op.instance_name
2779

    
2780
    _CheckNodeOnline(self, instance.primary_node)
2781

    
2782
    # check bridges existance
2783
    _CheckInstanceBridgesExist(self, instance)
2784

    
2785
  def Exec(self, feedback_fn):
2786
    """Reboot the instance.
2787

2788
    """
2789
    instance = self.instance
2790
    ignore_secondaries = self.op.ignore_secondaries
2791
    reboot_type = self.op.reboot_type
2792
    extra_args = getattr(self.op, "extra_args", "")
2793

    
2794
    node_current = instance.primary_node
2795

    
2796
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2797
                       constants.INSTANCE_REBOOT_HARD]:
2798
      for disk in instance.disks:
2799
        self.cfg.SetDiskID(disk, node_current)
2800
      result = self.rpc.call_instance_reboot(node_current, instance,
2801
                                             reboot_type, extra_args)
2802
      msg = result.RemoteFailMsg()
2803
      if msg:
2804
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2805
    else:
2806
      result = self.rpc.call_instance_shutdown(node_current, instance)
2807
      msg = result.RemoteFailMsg()
2808
      if msg:
2809
        raise errors.OpExecError("Could not shutdown instance for"
2810
                                 " full reboot: %s" % msg)
2811
      _ShutdownInstanceDisks(self, instance)
2812
      _StartInstanceDisks(self, instance, ignore_secondaries)
2813
      result = self.rpc.call_instance_start(node_current, instance, extra_args)
2814
      msg = result.RemoteFailMsg()
2815
      if msg:
2816
        _ShutdownInstanceDisks(self, instance)
2817
        raise errors.OpExecError("Could not start instance for"
2818
                                 " full reboot: %s" % msg)
2819

    
2820
    self.cfg.MarkInstanceUp(instance.name)
2821

    
2822

    
2823
class LUShutdownInstance(LogicalUnit):
2824
  """Shutdown an instance.
2825

2826
  """
2827
  HPATH = "instance-stop"
2828
  HTYPE = constants.HTYPE_INSTANCE
2829
  _OP_REQP = ["instance_name"]
2830
  REQ_BGL = False
2831

    
2832
  def ExpandNames(self):
2833
    self._ExpandAndLockInstance()
2834

    
2835
  def BuildHooksEnv(self):
2836
    """Build hooks env.
2837

2838
    This runs on master, primary and secondary nodes of the instance.
2839

2840
    """
2841
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2842
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2843
    return env, nl, nl
2844

    
2845
  def CheckPrereq(self):
2846
    """Check prerequisites.
2847

2848
    This checks that the instance is in the cluster.
2849

2850
    """
2851
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2852
    assert self.instance is not None, \
2853
      "Cannot retrieve locked instance %s" % self.op.instance_name
2854
    _CheckNodeOnline(self, self.instance.primary_node)
2855

    
2856
  def Exec(self, feedback_fn):
2857
    """Shutdown the instance.
2858

2859
    """
2860
    instance = self.instance
2861
    node_current = instance.primary_node
2862
    self.cfg.MarkInstanceDown(instance.name)
2863
    result = self.rpc.call_instance_shutdown(node_current, instance)
2864
    msg = result.RemoteFailMsg()
2865
    if msg:
2866
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2867

    
2868
    _ShutdownInstanceDisks(self, instance)
2869

    
2870

    
2871
class LUReinstallInstance(LogicalUnit):
2872
  """Reinstall an instance.
2873

2874
  """
2875
  HPATH = "instance-reinstall"
2876
  HTYPE = constants.HTYPE_INSTANCE
2877
  _OP_REQP = ["instance_name"]
2878
  REQ_BGL = False
2879

    
2880
  def ExpandNames(self):
2881
    self._ExpandAndLockInstance()
2882

    
2883
  def BuildHooksEnv(self):
2884
    """Build hooks env.
2885

2886
    This runs on master, primary and secondary nodes of the instance.
2887

2888
    """
2889
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2890
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2891
    return env, nl, nl
2892

    
2893
  def CheckPrereq(self):
2894
    """Check prerequisites.
2895

2896
    This checks that the instance is in the cluster and is not running.
2897

2898
    """
2899
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2900
    assert instance is not None, \
2901
      "Cannot retrieve locked instance %s" % self.op.instance_name
2902
    _CheckNodeOnline(self, instance.primary_node)
2903

    
2904
    if instance.disk_template == constants.DT_DISKLESS:
2905
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2906
                                 self.op.instance_name)
2907
    if instance.admin_up:
2908
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2909
                                 self.op.instance_name)
2910
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2911
                                              instance.name,
2912
                                              instance.hypervisor)
2913
    if remote_info.failed or remote_info.data:
2914
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2915
                                 (self.op.instance_name,
2916
                                  instance.primary_node))
2917

    
2918
    self.op.os_type = getattr(self.op, "os_type", None)
2919
    if self.op.os_type is not None:
2920
      # OS verification
2921
      pnode = self.cfg.GetNodeInfo(
2922
        self.cfg.ExpandNodeName(instance.primary_node))
2923
      if pnode is None:
2924
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2925
                                   self.op.pnode)
2926
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2927
      result.Raise()
2928
      if not isinstance(result.data, objects.OS):
2929
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2930
                                   " primary node"  % self.op.os_type)
2931

    
2932
    self.instance = instance
2933

    
2934
  def Exec(self, feedback_fn):
2935
    """Reinstall the instance.
2936

2937
    """
2938
    inst = self.instance
2939

    
2940
    if self.op.os_type is not None:
2941
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2942
      inst.os = self.op.os_type
2943
      self.cfg.Update(inst)
2944

    
2945
    _StartInstanceDisks(self, inst, None)
2946
    try:
2947
      feedback_fn("Running the instance OS create scripts...")
2948
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2949
      msg = result.RemoteFailMsg()
2950
      if msg:
2951
        raise errors.OpExecError("Could not install OS for instance %s"
2952
                                 " on node %s: %s" %
2953
                                 (inst.name, inst.primary_node, msg))
2954
    finally:
2955
      _ShutdownInstanceDisks(self, inst)
2956

    
2957

    
2958
class LURenameInstance(LogicalUnit):
2959
  """Rename an instance.
2960

2961
  """
2962
  HPATH = "instance-rename"
2963
  HTYPE = constants.HTYPE_INSTANCE
2964
  _OP_REQP = ["instance_name", "new_name"]
2965

    
2966
  def BuildHooksEnv(self):
2967
    """Build hooks env.
2968

2969
    This runs on master, primary and secondary nodes of the instance.
2970

2971
    """
2972
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2973
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2974
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2975
    return env, nl, nl
2976

    
2977
  def CheckPrereq(self):
2978
    """Check prerequisites.
2979

2980
    This checks that the instance is in the cluster and is not running.
2981

2982
    """
2983
    instance = self.cfg.GetInstanceInfo(
2984
      self.cfg.ExpandInstanceName(self.op.instance_name))
2985
    if instance is None:
2986
      raise errors.OpPrereqError("Instance '%s' not known" %
2987
                                 self.op.instance_name)
2988
    _CheckNodeOnline(self, instance.primary_node)
2989

    
2990
    if instance.admin_up:
2991
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2992
                                 self.op.instance_name)
2993
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2994
                                              instance.name,
2995
                                              instance.hypervisor)
2996
    remote_info.Raise()
2997
    if remote_info.data:
2998
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2999
                                 (self.op.instance_name,
3000
                                  instance.primary_node))
3001
    self.instance = instance
3002

    
3003
    # new name verification
3004
    name_info = utils.HostInfo(self.op.new_name)
3005

    
3006
    self.op.new_name = new_name = name_info.name
3007
    instance_list = self.cfg.GetInstanceList()
3008
    if new_name in instance_list:
3009
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3010
                                 new_name)
3011

    
3012
    if not getattr(self.op, "ignore_ip", False):
3013
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3014
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3015
                                   (name_info.ip, new_name))
3016

    
3017

    
3018
  def Exec(self, feedback_fn):
3019
    """Reinstall the instance.
3020

3021
    """
3022
    inst = self.instance
3023
    old_name = inst.name
3024

    
3025
    if inst.disk_template == constants.DT_FILE:
3026
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3027

    
3028
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3029
    # Change the instance lock. This is definitely safe while we hold the BGL
3030
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3031
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3032

    
3033
    # re-read the instance from the configuration after rename
3034
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3035

    
3036
    if inst.disk_template == constants.DT_FILE:
3037
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3038
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3039
                                                     old_file_storage_dir,
3040
                                                     new_file_storage_dir)
3041
      result.Raise()
3042
      if not result.data:
3043
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3044
                                 " directory '%s' to '%s' (but the instance"
3045
                                 " has been renamed in Ganeti)" % (
3046
                                 inst.primary_node, old_file_storage_dir,
3047
                                 new_file_storage_dir))
3048

    
3049
      if not result.data[0]:
3050
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3051
                                 " (but the instance has been renamed in"
3052
                                 " Ganeti)" % (old_file_storage_dir,
3053
                                               new_file_storage_dir))
3054

    
3055
    _StartInstanceDisks(self, inst, None)
3056
    try:
3057
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3058
                                                 old_name)
3059
      msg = result.RemoteFailMsg()
3060
      if msg:
3061
        msg = ("Could not run OS rename script for instance %s on node %s"
3062
               " (but the instance has been renamed in Ganeti): %s" %
3063
               (inst.name, inst.primary_node, msg))
3064
        self.proc.LogWarning(msg)
3065
    finally:
3066
      _ShutdownInstanceDisks(self, inst)
3067

    
3068

    
3069
class LURemoveInstance(LogicalUnit):
3070
  """Remove an instance.
3071

3072
  """
3073
  HPATH = "instance-remove"
3074
  HTYPE = constants.HTYPE_INSTANCE
3075
  _OP_REQP = ["instance_name", "ignore_failures"]
3076
  REQ_BGL = False
3077

    
3078
  def ExpandNames(self):
3079
    self._ExpandAndLockInstance()
3080
    self.needed_locks[locking.LEVEL_NODE] = []
3081
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3082

    
3083
  def DeclareLocks(self, level):
3084
    if level == locking.LEVEL_NODE:
3085
      self._LockInstancesNodes()
3086

    
3087
  def BuildHooksEnv(self):
3088
    """Build hooks env.
3089

3090
    This runs on master, primary and secondary nodes of the instance.
3091

3092
    """
3093
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3094
    nl = [self.cfg.GetMasterNode()]
3095
    return env, nl, nl
3096

    
3097
  def CheckPrereq(self):
3098
    """Check prerequisites.
3099

3100
    This checks that the instance is in the cluster.
3101

3102
    """
3103
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3104
    assert self.instance is not None, \
3105
      "Cannot retrieve locked instance %s" % self.op.instance_name
3106

    
3107
  def Exec(self, feedback_fn):
3108
    """Remove the instance.
3109

3110
    """
3111
    instance = self.instance
3112
    logging.info("Shutting down instance %s on node %s",
3113
                 instance.name, instance.primary_node)
3114

    
3115
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3116
    msg = result.RemoteFailMsg()
3117
    if msg:
3118
      if self.op.ignore_failures:
3119
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3120
      else:
3121
        raise errors.OpExecError("Could not shutdown instance %s on"
3122
                                 " node %s: %s" %
3123
                                 (instance.name, instance.primary_node, msg))
3124

    
3125
    logging.info("Removing block devices for instance %s", instance.name)
3126

    
3127
    if not _RemoveDisks(self, instance):
3128
      if self.op.ignore_failures:
3129
        feedback_fn("Warning: can't remove instance's disks")
3130
      else:
3131
        raise errors.OpExecError("Can't remove instance's disks")
3132

    
3133
    logging.info("Removing instance %s out of cluster config", instance.name)
3134

    
3135
    self.cfg.RemoveInstance(instance.name)
3136
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3137

    
3138

    
3139
class LUQueryInstances(NoHooksLU):
3140
  """Logical unit for querying instances.
3141

3142
  """
3143
  _OP_REQP = ["output_fields", "names", "use_locking"]
3144
  REQ_BGL = False
3145
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3146
                                    "admin_state",
3147
                                    "disk_template", "ip", "mac", "bridge",
3148
                                    "sda_size", "sdb_size", "vcpus", "tags",
3149
                                    "network_port", "beparams",
3150
                                    r"(disk)\.(size)/([0-9]+)",
3151
                                    r"(disk)\.(sizes)", "disk_usage",
3152
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3153
                                    r"(nic)\.(macs|ips|bridges)",
3154
                                    r"(disk|nic)\.(count)",
3155
                                    "serial_no", "hypervisor", "hvparams",] +
3156
                                  ["hv/%s" % name
3157
                                   for name in constants.HVS_PARAMETERS] +
3158
                                  ["be/%s" % name
3159
                                   for name in constants.BES_PARAMETERS])
3160
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3161

    
3162

    
3163
  def ExpandNames(self):
3164
    _CheckOutputFields(static=self._FIELDS_STATIC,
3165
                       dynamic=self._FIELDS_DYNAMIC,
3166
                       selected=self.op.output_fields)
3167

    
3168
    self.needed_locks = {}
3169
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3170
    self.share_locks[locking.LEVEL_NODE] = 1
3171

    
3172
    if self.op.names:
3173
      self.wanted = _GetWantedInstances(self, self.op.names)
3174
    else:
3175
      self.wanted = locking.ALL_SET
3176

    
3177
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3178
    self.do_locking = self.do_node_query and self.op.use_locking
3179
    if self.do_locking:
3180
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3181
      self.needed_locks[locking.LEVEL_NODE] = []
3182
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3183

    
3184
  def DeclareLocks(self, level):
3185
    if level == locking.LEVEL_NODE and self.do_locking:
3186
      self._LockInstancesNodes()
3187

    
3188
  def CheckPrereq(self):
3189
    """Check prerequisites.
3190

3191
    """
3192
    pass
3193

    
3194
  def Exec(self, feedback_fn):
3195
    """Computes the list of nodes and their attributes.
3196

3197
    """
3198
    all_info = self.cfg.GetAllInstancesInfo()
3199
    if self.wanted == locking.ALL_SET:
3200
      # caller didn't specify instance names, so ordering is not important
3201
      if self.do_locking:
3202
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3203
      else:
3204
        instance_names = all_info.keys()
3205
      instance_names = utils.NiceSort(instance_names)
3206
    else:
3207
      # caller did specify names, so we must keep the ordering
3208
      if self.do_locking:
3209
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3210
      else:
3211
        tgt_set = all_info.keys()
3212
      missing = set(self.wanted).difference(tgt_set)
3213
      if missing:
3214
        raise errors.OpExecError("Some instances were removed before"
3215
                                 " retrieving their data: %s" % missing)
3216
      instance_names = self.wanted
3217

    
3218
    instance_list = [all_info[iname] for iname in instance_names]
3219

    
3220
    # begin data gathering
3221

    
3222
    nodes = frozenset([inst.primary_node for inst in instance_list])
3223
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3224

    
3225
    bad_nodes = []
3226
    off_nodes = []
3227
    if self.do_node_query:
3228
      live_data = {}
3229
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3230
      for name in nodes:
3231
        result = node_data[name]
3232
        if result.offline:
3233
          # offline nodes will be in both lists
3234
          off_nodes.append(name)
3235
        if result.failed:
3236
          bad_nodes.append(name)
3237
        else:
3238
          if result.data:
3239
            live_data.update(result.data)
3240
            # else no instance is alive
3241
    else:
3242
      live_data = dict([(name, {}) for name in instance_names])
3243

    
3244
    # end data gathering
3245

    
3246
    HVPREFIX = "hv/"
3247
    BEPREFIX = "be/"
3248
    output = []
3249
    for instance in instance_list:
3250
      iout = []
3251
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3252
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3253
      for field in self.op.output_fields:
3254
        st_match = self._FIELDS_STATIC.Matches(field)
3255
        if field == "name":
3256
          val = instance.name
3257
        elif field == "os":
3258
          val = instance.os
3259
        elif field == "pnode":
3260
          val = instance.primary_node
3261
        elif field == "snodes":
3262
          val = list(instance.secondary_nodes)
3263
        elif field == "admin_state":
3264
          val = instance.admin_up
3265
        elif field == "oper_state":
3266
          if instance.primary_node in bad_nodes:
3267
            val = None
3268
          else:
3269
            val = bool(live_data.get(instance.name))
3270
        elif field == "status":
3271
          if instance.primary_node in off_nodes:
3272
            val = "ERROR_nodeoffline"
3273
          elif instance.primary_node in bad_nodes:
3274
            val = "ERROR_nodedown"
3275
          else:
3276
            running = bool(live_data.get(instance.name))
3277
            if running:
3278
              if instance.admin_up:
3279
                val = "running"
3280
              else:
3281
                val = "ERROR_up"
3282
            else:
3283
              if instance.admin_up:
3284
                val = "ERROR_down"
3285
              else:
3286
                val = "ADMIN_down"
3287
        elif field == "oper_ram":
3288
          if instance.primary_node in bad_nodes:
3289
            val = None
3290
          elif instance.name in live_data:
3291
            val = live_data[instance.name].get("memory", "?")
3292
          else:
3293
            val = "-"
3294
        elif field == "disk_template":
3295
          val = instance.disk_template
3296
        elif field == "ip":
3297
          val = instance.nics[0].ip
3298
        elif field == "bridge":
3299
          val = instance.nics[0].bridge
3300
        elif field == "mac":
3301
          val = instance.nics[0].mac
3302
        elif field == "sda_size" or field == "sdb_size":
3303
          idx = ord(field[2]) - ord('a')
3304
          try:
3305
            val = instance.FindDisk(idx).size
3306
          except errors.OpPrereqError:
3307
            val = None
3308
        elif field == "disk_usage": # total disk usage per node
3309
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3310
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3311
        elif field == "tags":
3312
          val = list(instance.GetTags())
3313
        elif field == "serial_no":
3314
          val = instance.serial_no
3315
        elif field == "network_port":
3316
          val = instance.network_port
3317
        elif field == "hypervisor":
3318
          val = instance.hypervisor
3319
        elif field == "hvparams":
3320
          val = i_hv
3321
        elif (field.startswith(HVPREFIX) and
3322
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3323
          val = i_hv.get(field[len(HVPREFIX):], None)
3324
        elif field == "beparams":
3325
          val = i_be
3326
        elif (field.startswith(BEPREFIX) and
3327
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3328
          val = i_be.get(field[len(BEPREFIX):], None)
3329
        elif st_match and st_match.groups():
3330
          # matches a variable list
3331
          st_groups = st_match.groups()
3332
          if st_groups and st_groups[0] == "disk":
3333
            if st_groups[1] == "count":
3334
              val = len(instance.disks)
3335
            elif st_groups[1] == "sizes":
3336
              val = [disk.size for disk in instance.disks]
3337
            elif st_groups[1] == "size":
3338
              try:
3339
                val = instance.FindDisk(st_groups[2]).size
3340
              except errors.OpPrereqError:
3341
                val = None
3342
            else:
3343
              assert False, "Unhandled disk parameter"
3344
          elif st_groups[0] == "nic":
3345
            if st_groups[1] == "count":
3346
              val = len(instance.nics)
3347
            elif st_groups[1] == "macs":
3348
              val = [nic.mac for nic in instance.nics]
3349
            elif st_groups[1] == "ips":
3350
              val = [nic.ip for nic in instance.nics]
3351
            elif st_groups[1] == "bridges":
3352
              val = [nic.bridge for nic in instance.nics]
3353
            else:
3354
              # index-based item
3355
              nic_idx = int(st_groups[2])
3356
              if nic_idx >= len(instance.nics):
3357
                val = None
3358
              else:
3359
                if st_groups[1] == "mac":
3360
                  val = instance.nics[nic_idx].mac
3361
                elif st_groups[1] == "ip":
3362
                  val = instance.nics[nic_idx].ip
3363
                elif st_groups[1] == "bridge":
3364
                  val = instance.nics[nic_idx].bridge
3365
                else:
3366
                  assert False, "Unhandled NIC parameter"
3367
          else:
3368
            assert False, "Unhandled variable parameter"
3369
        else:
3370
          raise errors.ParameterError(field)
3371
        iout.append(val)
3372
      output.append(iout)
3373

    
3374
    return output
3375

    
3376

    
3377
class LUFailoverInstance(LogicalUnit):
3378
  """Failover an instance.
3379

3380
  """
3381
  HPATH = "instance-failover"
3382
  HTYPE = constants.HTYPE_INSTANCE
3383
  _OP_REQP = ["instance_name", "ignore_consistency"]
3384
  REQ_BGL = False
3385

    
3386
  def ExpandNames(self):
3387
    self._ExpandAndLockInstance()
3388
    self.needed_locks[locking.LEVEL_NODE] = []
3389
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3390

    
3391
  def DeclareLocks(self, level):
3392
    if level == locking.LEVEL_NODE:
3393
      self._LockInstancesNodes()
3394

    
3395
  def BuildHooksEnv(self):
3396
    """Build hooks env.
3397

3398
    This runs on master, primary and secondary nodes of the instance.
3399

3400
    """
3401
    env = {
3402
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3403
      }
3404
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3405
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3406
    return env, nl, nl
3407

    
3408
  def CheckPrereq(self):
3409
    """Check prerequisites.
3410

3411
    This checks that the instance is in the cluster.
3412

3413
    """
3414
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3415
    assert self.instance is not None, \
3416
      "Cannot retrieve locked instance %s" % self.op.instance_name
3417

    
3418
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3419
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3420
      raise errors.OpPrereqError("Instance's disk layout is not"
3421
                                 " network mirrored, cannot failover.")
3422

    
3423
    secondary_nodes = instance.secondary_nodes
3424
    if not secondary_nodes:
3425
      raise errors.ProgrammerError("no secondary node but using "
3426
                                   "a mirrored disk template")
3427

    
3428
    target_node = secondary_nodes[0]
3429
    _CheckNodeOnline(self, target_node)
3430
    _CheckNodeNotDrained(self, target_node)
3431
    # check memory requirements on the secondary node
3432
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3433
                         instance.name, bep[constants.BE_MEMORY],
3434
                         instance.hypervisor)
3435

    
3436
    # check bridge existance
3437
    brlist = [nic.bridge for nic in instance.nics]
3438
    result = self.rpc.call_bridges_exist(target_node, brlist)
3439
    result.Raise()
3440
    if not result.data:
3441
      raise errors.OpPrereqError("One or more target bridges %s does not"
3442
                                 " exist on destination node '%s'" %
3443
                                 (brlist, target_node))
3444

    
3445
  def Exec(self, feedback_fn):
3446
    """Failover an instance.
3447

3448
    The failover is done by shutting it down on its present node and
3449
    starting it on the secondary.
3450

3451
    """
3452
    instance = self.instance
3453

    
3454
    source_node = instance.primary_node
3455
    target_node = instance.secondary_nodes[0]
3456

    
3457
    feedback_fn("* checking disk consistency between source and target")
3458
    for dev in instance.disks:
3459
      # for drbd, these are drbd over lvm
3460
      if not _CheckDiskConsistency(self, dev, target_node, False):
3461
        if instance.admin_up and not self.op.ignore_consistency:
3462
          raise errors.OpExecError("Disk %s is degraded on target node,"
3463
                                   " aborting failover." % dev.iv_name)
3464

    
3465
    feedback_fn("* shutting down instance on source node")
3466
    logging.info("Shutting down instance %s on node %s",
3467
                 instance.name, source_node)
3468

    
3469
    result = self.rpc.call_instance_shutdown(source_node, instance)
3470
    msg = result.RemoteFailMsg()
3471
    if msg:
3472
      if self.op.ignore_consistency:
3473
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3474
                             " Proceeding anyway. Please make sure node"
3475
                             " %s is down. Error details: %s",
3476
                             instance.name, source_node, source_node, msg)
3477
      else:
3478
        raise errors.OpExecError("Could not shutdown instance %s on"
3479
                                 " node %s: %s" %
3480
                                 (instance.name, source_node, msg))
3481

    
3482
    feedback_fn("* deactivating the instance's disks on source node")
3483
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3484
      raise errors.OpExecError("Can't shut down the instance's disks.")
3485

    
3486
    instance.primary_node = target_node
3487
    # distribute new instance config to the other nodes
3488
    self.cfg.Update(instance)
3489

    
3490
    # Only start the instance if it's marked as up
3491
    if instance.admin_up:
3492
      feedback_fn("* activating the instance's disks on target node")
3493
      logging.info("Starting instance %s on node %s",
3494
                   instance.name, target_node)
3495

    
3496
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3497
                                               ignore_secondaries=True)
3498
      if not disks_ok:
3499
        _ShutdownInstanceDisks(self, instance)
3500
        raise errors.OpExecError("Can't activate the instance's disks")
3501

    
3502
      feedback_fn("* starting the instance on the target node")
3503
      result = self.rpc.call_instance_start(target_node, instance, None)
3504
      msg = result.RemoteFailMsg()
3505
      if msg:
3506
        _ShutdownInstanceDisks(self, instance)
3507
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3508
                                 (instance.name, target_node, msg))
3509

    
3510

    
3511
class LUMigrateInstance(LogicalUnit):
3512
  """Migrate an instance.
3513

3514
  This is migration without shutting down, compared to the failover,
3515
  which is done with shutdown.
3516

3517
  """
3518
  HPATH = "instance-migrate"
3519
  HTYPE = constants.HTYPE_INSTANCE
3520
  _OP_REQP = ["instance_name", "live", "cleanup"]
3521

    
3522
  REQ_BGL = False
3523

    
3524
  def ExpandNames(self):
3525
    self._ExpandAndLockInstance()
3526
    self.needed_locks[locking.LEVEL_NODE] = []
3527
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3528

    
3529
  def DeclareLocks(self, level):
3530
    if level == locking.LEVEL_NODE:
3531
      self._LockInstancesNodes()
3532

    
3533
  def BuildHooksEnv(self):
3534
    """Build hooks env.
3535

3536
    This runs on master, primary and secondary nodes of the instance.
3537

3538
    """
3539
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3540
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3541
    return env, nl, nl
3542

    
3543
  def CheckPrereq(self):
3544
    """Check prerequisites.
3545

3546
    This checks that the instance is in the cluster.
3547

3548
    """
3549
    instance = self.cfg.GetInstanceInfo(
3550
      self.cfg.ExpandInstanceName(self.op.instance_name))
3551
    if instance is None:
3552
      raise errors.OpPrereqError("Instance '%s' not known" %
3553
                                 self.op.instance_name)
3554

    
3555
    if instance.disk_template != constants.DT_DRBD8:
3556
      raise errors.OpPrereqError("Instance's disk layout is not"
3557
                                 " drbd8, cannot migrate.")
3558

    
3559
    secondary_nodes = instance.secondary_nodes
3560
    if not secondary_nodes:
3561
      raise errors.ConfigurationError("No secondary node but using"
3562
                                      " drbd8 disk template")
3563

    
3564
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3565

    
3566
    target_node = secondary_nodes[0]
3567
    # check memory requirements on the secondary node
3568
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3569
                         instance.name, i_be[constants.BE_MEMORY],
3570
                         instance.hypervisor)
3571

    
3572
    # check bridge existance
3573
    brlist = [nic.bridge for nic in instance.nics]
3574
    result = self.rpc.call_bridges_exist(target_node, brlist)
3575
    if result.failed or not result.data:
3576
      raise errors.OpPrereqError("One or more target bridges %s does not"
3577
                                 " exist on destination node '%s'" %
3578
                                 (brlist, target_node))
3579

    
3580
    if not self.op.cleanup:
3581
      _CheckNodeNotDrained(self, target_node)
3582
      result = self.rpc.call_instance_migratable(instance.primary_node,
3583
                                                 instance)
3584
      msg = result.RemoteFailMsg()
3585
      if msg:
3586
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3587
                                   msg)
3588

    
3589
    self.instance = instance
3590

    
3591
  def _WaitUntilSync(self):
3592
    """Poll with custom rpc for disk sync.
3593

3594
    This uses our own step-based rpc call.
3595

3596
    """
3597
    self.feedback_fn("* wait until resync is done")
3598
    all_done = False
3599
    while not all_done:
3600
      all_done = True
3601
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3602
                                            self.nodes_ip,
3603
                                            self.instance.disks)
3604
      min_percent = 100
3605
      for node, nres in result.items():
3606
        msg = nres.RemoteFailMsg()
3607
        if msg:
3608
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3609
                                   (node, msg))
3610
        node_done, node_percent = nres.payload
3611
        all_done = all_done and node_done
3612
        if node_percent is not None:
3613
          min_percent = min(min_percent, node_percent)
3614
      if not all_done:
3615
        if min_percent < 100:
3616
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3617
        time.sleep(2)
3618

    
3619
  def _EnsureSecondary(self, node):
3620
    """Demote a node to secondary.
3621

3622
    """
3623
    self.feedback_fn("* switching node %s to secondary mode" % node)
3624

    
3625
    for dev in self.instance.disks:
3626
      self.cfg.SetDiskID(dev, node)
3627

    
3628
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3629
                                          self.instance.disks)
3630
    msg = result.RemoteFailMsg()
3631
    if msg:
3632
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3633
                               " error %s" % (node, msg))
3634

    
3635
  def _GoStandalone(self):
3636
    """Disconnect from the network.
3637

3638
    """
3639
    self.feedback_fn("* changing into standalone mode")
3640
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3641
                                               self.instance.disks)
3642
    for node, nres in result.items():
3643
      msg = nres.RemoteFailMsg()
3644
      if msg:
3645
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3646
                                 " error %s" % (node, msg))
3647

    
3648
  def _GoReconnect(self, multimaster):
3649
    """Reconnect to the network.
3650

3651
    """
3652
    if multimaster:
3653
      msg = "dual-master"
3654
    else:
3655
      msg = "single-master"
3656
    self.feedback_fn("* changing disks into %s mode" % msg)
3657
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3658
                                           self.instance.disks,
3659
                                           self.instance.name, multimaster)
3660
    for node, nres in result.items():
3661
      msg = nres.RemoteFailMsg()
3662
      if msg:
3663
        raise errors.OpExecError("Cannot change disks config on node %s,"
3664
                                 " error: %s" % (node, msg))
3665

    
3666
  def _ExecCleanup(self):
3667
    """Try to cleanup after a failed migration.
3668

3669
    The cleanup is done by:
3670
      - check that the instance is running only on one node
3671
        (and update the config if needed)
3672
      - change disks on its secondary node to secondary
3673
      - wait until disks are fully synchronized
3674
      - disconnect from the network
3675
      - change disks into single-master mode
3676
      - wait again until disks are fully synchronized
3677

3678
    """
3679
    instance = self.instance
3680
    target_node = self.target_node
3681
    source_node = self.source_node
3682

    
3683
    # check running on only one node
3684
    self.feedback_fn("* checking where the instance actually runs"
3685
                     " (if this hangs, the hypervisor might be in"
3686
                     " a bad state)")
3687
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3688
    for node, result in ins_l.items():
3689
      result.Raise()
3690
      if not isinstance(result.data, list):
3691
        raise errors.OpExecError("Can't contact node '%s'" % node)
3692

    
3693
    runningon_source = instance.name in ins_l[source_node].data
3694
    runningon_target = instance.name in ins_l[target_node].data
3695

    
3696
    if runningon_source and runningon_target:
3697
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3698
                               " or the hypervisor is confused. You will have"
3699
                               " to ensure manually that it runs only on one"
3700
                               " and restart this operation.")
3701

    
3702
    if not (runningon_source or runningon_target):
3703
      raise errors.OpExecError("Instance does not seem to be running at all."
3704
                               " In this case, it's safer to repair by"
3705
                               " running 'gnt-instance stop' to ensure disk"
3706
                               " shutdown, and then restarting it.")
3707

    
3708
    if runningon_target:
3709
      # the migration has actually succeeded, we need to update the config
3710
      self.feedback_fn("* instance running on secondary node (%s),"
3711
                       " updating config" % target_node)
3712
      instance.primary_node = target_node
3713
      self.cfg.Update(instance)
3714
      demoted_node = source_node
3715
    else:
3716
      self.feedback_fn("* instance confirmed to be running on its"
3717
                       " primary node (%s)" % source_node)
3718
      demoted_node = target_node
3719

    
3720
    self._EnsureSecondary(demoted_node)
3721
    try:
3722
      self._WaitUntilSync()
3723
    except errors.OpExecError:
3724
      # we ignore here errors, since if the device is standalone, it
3725
      # won't be able to sync
3726
      pass
3727
    self._GoStandalone()
3728
    self._GoReconnect(False)
3729
    self._WaitUntilSync()
3730

    
3731
    self.feedback_fn("* done")
3732

    
3733
  def _RevertDiskStatus(self):
3734
    """Try to revert the disk status after a failed migration.
3735

3736
    """
3737
    target_node = self.target_node
3738
    try:
3739
      self._EnsureSecondary(target_node)
3740
      self._GoStandalone()
3741
      self._GoReconnect(False)
3742
      self._WaitUntilSync()
3743
    except errors.OpExecError, err:
3744
      self.LogWarning("Migration failed and I can't reconnect the"
3745
                      " drives: error '%s'\n"
3746
                      "Please look and recover the instance status" %
3747
                      str(err))
3748

    
3749
  def _AbortMigration(self):
3750
    """Call the hypervisor code to abort a started migration.
3751

3752
    """
3753
    instance = self.instance
3754
    target_node = self.target_node
3755
    migration_info = self.migration_info
3756

    
3757
    abort_result = self.rpc.call_finalize_migration(target_node,
3758
                                                    instance,
3759
                                                    migration_info,
3760
                                                    False)
3761
    abort_msg = abort_result.RemoteFailMsg()
3762
    if abort_msg:
3763
      logging.error("Aborting migration failed on target node %s: %s" %
3764
                    (target_node, abort_msg))
3765
      # Don't raise an exception here, as we stil have to try to revert the
3766
      # disk status, even if this step failed.
3767

    
3768
  def _ExecMigration(self):
3769
    """Migrate an instance.
3770

3771
    The migrate is done by:
3772
      - change the disks into dual-master mode
3773
      - wait until disks are fully synchronized again
3774
      - migrate the instance
3775
      - change disks on the new secondary node (the old primary) to secondary
3776
      - wait until disks are fully synchronized
3777
      - change disks into single-master mode
3778

3779
    """
3780
    instance = self.instance
3781
    target_node = self.target_node
3782
    source_node = self.source_node
3783

    
3784
    self.feedback_fn("* checking disk consistency between source and target")
3785
    for dev in instance.disks:
3786
      if not _CheckDiskConsistency(self, dev, target_node, False):
3787
        raise errors.OpExecError("Disk %s is degraded or not fully"
3788
                                 " synchronized on target node,"
3789
                                 " aborting migrate." % dev.iv_name)
3790

    
3791
    # First get the migration information from the remote node
3792
    result = self.rpc.call_migration_info(source_node, instance)
3793
    msg = result.RemoteFailMsg()
3794
    if msg:
3795
      log_err = ("Failed fetching source migration information from %s: %s" %
3796
                 (source_node, msg))
3797
      logging.error(log_err)
3798
      raise errors.OpExecError(log_err)
3799

    
3800
    self.migration_info = migration_info = result.payload
3801

    
3802
    # Then switch the disks to master/master mode
3803
    self._EnsureSecondary(target_node)
3804
    self._GoStandalone()
3805
    self._GoReconnect(True)
3806
    self._WaitUntilSync()
3807

    
3808
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3809
    result = self.rpc.call_accept_instance(target_node,
3810
                                           instance,
3811
                                           migration_info,
3812
                                           self.nodes_ip[target_node])
3813

    
3814
    msg = result.RemoteFailMsg()
3815
    if msg:
3816
      logging.error("Instance pre-migration failed, trying to revert"
3817
                    " disk status: %s", msg)
3818
      self._AbortMigration()
3819
      self._RevertDiskStatus()
3820
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3821
                               (instance.name, msg))
3822

    
3823
    self.feedback_fn("* migrating instance to %s" % target_node)
3824
    time.sleep(10)
3825
    result = self.rpc.call_instance_migrate(source_node, instance,
3826
                                            self.nodes_ip[target_node],
3827
                                            self.op.live)
3828
    msg = result.RemoteFailMsg()
3829
    if msg:
3830
      logging.error("Instance migration failed, trying to revert"
3831
                    " disk status: %s", msg)
3832
      self._AbortMigration()
3833
      self._RevertDiskStatus()
3834
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3835
                               (instance.name, msg))
3836
    time.sleep(10)
3837

    
3838
    instance.primary_node = target_node
3839
    # distribute new instance config to the other nodes
3840
    self.cfg.Update(instance)
3841

    
3842
    result = self.rpc.call_finalize_migration(target_node,
3843
                                              instance,
3844
                                              migration_info,
3845
                                              True)
3846
    msg = result.RemoteFailMsg()
3847
    if msg:
3848
      logging.error("Instance migration succeeded, but finalization failed:"
3849
                    " %s" % msg)
3850
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3851
                               msg)
3852

    
3853
    self._EnsureSecondary(source_node)
3854
    self._WaitUntilSync()
3855
    self._GoStandalone()
3856
    self._GoReconnect(False)
3857
    self._WaitUntilSync()
3858

    
3859
    self.feedback_fn("* done")
3860

    
3861
  def Exec(self, feedback_fn):
3862
    """Perform the migration.
3863

3864
    """
3865
    self.feedback_fn = feedback_fn
3866

    
3867
    self.source_node = self.instance.primary_node
3868
    self.target_node = self.instance.secondary_nodes[0]
3869
    self.all_nodes = [self.source_node, self.target_node]
3870
    self.nodes_ip = {
3871
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3872
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3873
      }
3874
    if self.op.cleanup:
3875
      return self._ExecCleanup()
3876
    else:
3877
      return self._ExecMigration()
3878

    
3879

    
3880
def _CreateBlockDev(lu, node, instance, device, force_create,
3881
                    info, force_open):
3882
  """Create a tree of block devices on a given node.
3883

3884
  If this device type has to be created on secondaries, create it and
3885
  all its children.
3886

3887
  If not, just recurse to children keeping the same 'force' value.
3888

3889
  @param lu: the lu on whose behalf we execute
3890
  @param node: the node on which to create the device
3891
  @type instance: L{objects.Instance}
3892
  @param instance: the instance which owns the device
3893
  @type device: L{objects.Disk}
3894
  @param device: the device to create
3895
  @type force_create: boolean
3896
  @param force_create: whether to force creation of this device; this
3897
      will be change to True whenever we find a device which has
3898
      CreateOnSecondary() attribute
3899
  @param info: the extra 'metadata' we should attach to the device
3900
      (this will be represented as a LVM tag)
3901
  @type force_open: boolean
3902
  @param force_open: this parameter will be passes to the
3903
      L{backend.BlockdevCreate} function where it specifies
3904
      whether we run on primary or not, and it affects both
3905
      the child assembly and the device own Open() execution
3906

3907
  """
3908
  if device.CreateOnSecondary():
3909
    force_create = True
3910

    
3911
  if device.children:
3912
    for child in device.children:
3913
      _CreateBlockDev(lu, node, instance, child, force_create,
3914
                      info, force_open)
3915

    
3916
  if not force_create:
3917
    return
3918

    
3919
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3920

    
3921

    
3922
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3923
  """Create a single block device on a given node.
3924

3925
  This will not recurse over children of the device, so they must be
3926
  created in advance.
3927

3928
  @param lu: the lu on whose behalf we execute
3929
  @param node: the node on which to create the device
3930
  @type instance: L{objects.Instance}
3931
  @param instance: the instance which owns the device
3932
  @type device: L{objects.Disk}
3933
  @param device: the device to create
3934
  @param info: the extra 'metadata' we should attach to the device
3935
      (this will be represented as a LVM tag)
3936
  @type force_open: boolean
3937
  @param force_open: this parameter will be passes to the
3938
      L{backend.BlockdevCreate} function where it specifies
3939
      whether we run on primary or not, and it affects both
3940
      the child assembly and the device own Open() execution
3941

3942
  """
3943
  lu.cfg.SetDiskID(device, node)
3944
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3945
                                       instance.name, force_open, info)
3946
  msg = result.RemoteFailMsg()
3947
  if msg:
3948
    raise errors.OpExecError("Can't create block device %s on"
3949
                             " node %s for instance %s: %s" %
3950
                             (device, node, instance.name, msg))
3951
  if device.physical_id is None:
3952
    device.physical_id = result.payload
3953

    
3954

    
3955
def _GenerateUniqueNames(lu, exts):
3956
  """Generate a suitable LV name.
3957

3958
  This will generate a logical volume name for the given instance.
3959

3960
  """
3961
  results = []
3962
  for val in exts:
3963
    new_id = lu.cfg.GenerateUniqueID()
3964
    results.append("%s%s" % (new_id, val))
3965
  return results
3966

    
3967

    
3968
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3969
                         p_minor, s_minor):
3970
  """Generate a drbd8 device complete with its children.
3971

3972
  """
3973
  port = lu.cfg.AllocatePort()
3974
  vgname = lu.cfg.GetVGName()
3975
  shared_secret = lu.cfg.GenerateDRBDSecret()
3976
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3977
                          logical_id=(vgname, names[0]))
3978
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3979
                          logical_id=(vgname, names[1]))
3980
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3981
                          logical_id=(primary, secondary, port,
3982
                                      p_minor, s_minor,
3983
                                      shared_secret),
3984
                          children=[dev_data, dev_meta],
3985
                          iv_name=iv_name)
3986
  return drbd_dev
3987

    
3988

    
3989
def _GenerateDiskTemplate(lu, template_name,
3990
                          instance_name, primary_node,
3991
                          secondary_nodes, disk_info,
3992
                          file_storage_dir, file_driver,
3993
                          base_index):
3994
  """Generate the entire disk layout for a given template type.
3995

3996
  """
3997
  #TODO: compute space requirements
3998

    
3999
  vgname = lu.cfg.GetVGName()
4000
  disk_count = len(disk_info)
4001
  disks = []
4002
  if template_name == constants.DT_DISKLESS:
4003
    pass
4004
  elif template_name == constants.DT_PLAIN:
4005
    if len(secondary_nodes) != 0:
4006
      raise errors.ProgrammerError("Wrong template configuration")
4007

    
4008
    names = _GenerateUniqueNames(lu, [".disk%d" % i
4009
                                      for i in range(disk_count)])
4010
    for idx, disk in enumerate(disk_info):
4011
      disk_index = idx + base_index
4012
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4013
                              logical_id=(vgname, names[idx]),
4014
                              iv_name="disk/%d" % disk_index,
4015
                              mode=disk["mode"])
4016
      disks.append(disk_dev)
4017
  elif template_name == constants.DT_DRBD8:
4018
    if len(secondary_nodes) != 1:
4019
      raise errors.ProgrammerError("Wrong template configuration")
4020
    remote_node = secondary_nodes[0]
4021
    minors = lu.cfg.AllocateDRBDMinor(
4022
      [primary_node, remote_node] * len(disk_info), instance_name)
4023

    
4024
    names = []
4025
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4026
                                               for i in range(disk_count)]):
4027
      names.append(lv_prefix + "_data")
4028
      names.append(lv_prefix + "_meta")
4029
    for idx, disk in enumerate(disk_info):
4030
      disk_index = idx + base_index
4031
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4032
                                      disk["size"], names[idx*2:idx*2+2],
4033
                                      "disk/%d" % disk_index,
4034
                                      minors[idx*2], minors[idx*2+1])
4035
      disk_dev.mode = disk["mode"]
4036
      disks.append(disk_dev)
4037
  elif template_name == constants.DT_FILE:
4038
    if len(secondary_nodes) != 0:
4039
      raise errors.ProgrammerError("Wrong template configuration")
4040

    
4041
    for idx, disk in enumerate(disk_info):
4042
      disk_index = idx + base_index
4043
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4044
                              iv_name="disk/%d" % disk_index,
4045
                              logical_id=(file_driver,
4046
                                          "%s/disk%d" % (file_storage_dir,
4047
                                                         disk_index)),
4048
                              mode=disk["mode"])
4049
      disks.append(disk_dev)
4050
  else:
4051
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4052
  return disks
4053

    
4054

    
4055
def _GetInstanceInfoText(instance):
4056
  """Compute that text that should be added to the disk's metadata.
4057

4058
  """
4059
  return "originstname+%s" % instance.name
4060

    
4061

    
4062
def _CreateDisks(lu, instance):
4063
  """Create all disks for an instance.
4064

4065
  This abstracts away some work from AddInstance.
4066

4067
  @type lu: L{LogicalUnit}
4068
  @param lu: the logical unit on whose behalf we execute
4069
  @type instance: L{objects.Instance}
4070
  @param instance: the instance whose disks we should create
4071
  @rtype: boolean
4072
  @return: the success of the creation
4073

4074
  """
4075
  info = _GetInstanceInfoText(instance)
4076
  pnode = instance.primary_node
4077

    
4078
  if instance.disk_template == constants.DT_FILE:
4079
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4080
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4081

    
4082
    if result.failed or not result.data:
4083
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4084

    
4085
    if not result.data[0]:
4086
      raise errors.OpExecError("Failed to create directory '%s'" %
4087
                               file_storage_dir)
4088

    
4089
  # Note: this needs to be kept in sync with adding of disks in
4090
  # LUSetInstanceParams
4091
  for device in instance.disks:
4092
    logging.info("Creating volume %s for instance %s",
4093
                 device.iv_name, instance.name)
4094
    #HARDCODE
4095
    for node in instance.all_nodes:
4096
      f_create = node == pnode
4097
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4098

    
4099

    
4100
def _RemoveDisks(lu, instance):
4101
  """Remove all disks for an instance.
4102

4103
  This abstracts away some work from `AddInstance()` and
4104
  `RemoveInstance()`. Note that in case some of the devices couldn't
4105
  be removed, the removal will continue with the other ones (compare
4106
  with `_CreateDisks()`).
4107

4108
  @type lu: L{LogicalUnit}
4109
  @param lu: the logical unit on whose behalf we execute
4110
  @type instance: L{objects.Instance}
4111
  @param instance: the instance whose disks we should remove
4112
  @rtype: boolean
4113
  @return: the success of the removal
4114

4115
  """
4116
  logging.info("Removing block devices for instance %s", instance.name)
4117

    
4118
  all_result = True
4119
  for device in instance.disks:
4120
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4121
      lu.cfg.SetDiskID(disk, node)
4122
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4123
      if msg:
4124
        lu.LogWarning("Could not remove block device %s on node %s,"
4125
                      " continuing anyway: %s", device.iv_name, node, msg)
4126
        all_result = False
4127

    
4128
  if instance.disk_template == constants.DT_FILE:
4129
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4130
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4131
                                                 file_storage_dir)
4132
    if result.failed or not result.data:
4133
      logging.error("Could not remove directory '%s'", file_storage_dir)
4134
      all_result = False
4135

    
4136
  return all_result
4137

    
4138

    
4139
def _ComputeDiskSize(disk_template, disks):
4140
  """Compute disk size requirements in the volume group
4141

4142
  """
4143
  # Required free disk space as a function of disk and swap space
4144
  req_size_dict = {
4145
    constants.DT_DISKLESS: None,
4146
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4147
    # 128 MB are added for drbd metadata for each disk
4148
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4149
    constants.DT_FILE: None,
4150
  }
4151

    
4152
  if disk_template not in req_size_dict:
4153
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4154
                                 " is unknown" %  disk_template)
4155

    
4156
  return req_size_dict[disk_template]
4157

    
4158

    
4159
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4160
  """Hypervisor parameter validation.
4161

4162
  This function abstract the hypervisor parameter validation to be
4163
  used in both instance create and instance modify.
4164

4165
  @type lu: L{LogicalUnit}
4166
  @param lu: the logical unit for which we check
4167
  @type nodenames: list
4168
  @param nodenames: the list of nodes on which we should check
4169
  @type hvname: string
4170
  @param hvname: the name of the hypervisor we should use
4171
  @type hvparams: dict
4172
  @param hvparams: the parameters which we need to check
4173
  @raise errors.OpPrereqError: if the parameters are not valid
4174

4175
  """
4176
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4177
                                                  hvname,
4178
                                                  hvparams)
4179
  for node in nodenames:
4180
    info = hvinfo[node]
4181
    if info.offline:
4182
      continue
4183
    msg = info.RemoteFailMsg()
4184
    if msg:
4185
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4186
                                 " %s" % msg)
4187

    
4188

    
4189
class LUCreateInstance(LogicalUnit):
4190
  """Create an instance.
4191

4192
  """
4193
  HPATH = "instance-add"
4194
  HTYPE = constants.HTYPE_INSTANCE
4195
  _OP_REQP = ["instance_name", "disks", "disk_template",
4196
              "mode", "start",
4197
              "wait_for_sync", "ip_check", "nics",
4198
              "hvparams", "beparams"]
4199
  REQ_BGL = False
4200

    
4201
  def _ExpandNode(self, node):
4202
    """Expands and checks one node name.
4203

4204
    """
4205
    node_full = self.cfg.ExpandNodeName(node)
4206
    if node_full is None:
4207
      raise errors.OpPrereqError("Unknown node %s" % node)
4208
    return node_full
4209

    
4210
  def ExpandNames(self):
4211
    """ExpandNames for CreateInstance.
4212

4213
    Figure out the right locks for instance creation.
4214

4215
    """
4216
    self.needed_locks = {}
4217

    
4218
    # set optional parameters to none if they don't exist
4219
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4220
      if not hasattr(self.op, attr):
4221
        setattr(self.op, attr, None)
4222

    
4223
    # cheap checks, mostly valid constants given
4224

    
4225
    # verify creation mode
4226
    if self.op.mode not in (constants.INSTANCE_CREATE,
4227
                            constants.INSTANCE_IMPORT):
4228
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4229
                                 self.op.mode)
4230

    
4231
    # disk template and mirror node verification
4232
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4233
      raise errors.OpPrereqError("Invalid disk template name")
4234

    
4235
    if self.op.hypervisor is None:
4236
      self.op.hypervisor = self.cfg.GetHypervisorType()
4237

    
4238
    cluster = self.cfg.GetClusterInfo()
4239
    enabled_hvs = cluster.enabled_hypervisors
4240
    if self.op.hypervisor not in enabled_hvs:
4241
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4242
                                 " cluster (%s)" % (self.op.hypervisor,
4243
                                  ",".join(enabled_hvs)))
4244

    
4245
    # check hypervisor parameter syntax (locally)
4246
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4247
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4248
                                  self.op.hvparams)
4249
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4250
    hv_type.CheckParameterSyntax(filled_hvp)
4251

    
4252
    # fill and remember the beparams dict
4253
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4254
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4255
                                    self.op.beparams)
4256

    
4257
    #### instance parameters check
4258

    
4259
    # instance name verification
4260
    hostname1 = utils.HostInfo(self.op.instance_name)
4261
    self.op.instance_name = instance_name = hostname1.name
4262

    
4263
    # this is just a preventive check, but someone might still add this
4264
    # instance in the meantime, and creation will fail at lock-add time
4265
    if instance_name in self.cfg.GetInstanceList():
4266
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4267
                                 instance_name)
4268

    
4269
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4270

    
4271
    # NIC buildup
4272
    self.nics = []
4273
    for nic in self.op.nics:
4274
      # ip validity checks
4275
      ip = nic.get("ip", None)
4276
      if ip is None or ip.lower() == "none":
4277
        nic_ip = None
4278
      elif ip.lower() == constants.VALUE_AUTO:
4279
        nic_ip = hostname1.ip
4280
      else:
4281
        if not utils.IsValidIP(ip):
4282
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4283
                                     " like a valid IP" % ip)
4284
        nic_ip = ip
4285

    
4286
      # MAC address verification
4287
      mac = nic.get("mac", constants.VALUE_AUTO)
4288
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4289
        if not utils.IsValidMac(mac.lower()):
4290
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4291
                                     mac)
4292
      # bridge verification
4293
      bridge = nic.get("bridge", None)
4294
      if bridge is None:
4295
        bridge = self.cfg.GetDefBridge()
4296
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4297

    
4298
    # disk checks/pre-build
4299
    self.disks = []
4300
    for disk in self.op.disks:
4301
      mode = disk.get("mode", constants.DISK_RDWR)
4302
      if mode not in constants.DISK_ACCESS_SET:
4303
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4304
                                   mode)
4305
      size = disk.get("size", None)
4306
      if size is None:
4307
        raise errors.OpPrereqError("Missing disk size")
4308
      try:
4309
        size = int(size)
4310
      except ValueError:
4311
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4312
      self.disks.append({"size": size, "mode": mode})
4313

    
4314
    # used in CheckPrereq for ip ping check
4315
    self.check_ip = hostname1.ip
4316

    
4317
    # file storage checks
4318
    if (self.op.file_driver and
4319
        not self.op.file_driver in constants.FILE_DRIVER):
4320
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4321
                                 self.op.file_driver)
4322

    
4323
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4324
      raise errors.OpPrereqError("File storage directory path not absolute")
4325

    
4326
    ### Node/iallocator related checks
4327
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4328
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4329
                                 " node must be given")
4330

    
4331
    if self.op.iallocator:
4332
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4333
    else:
4334
      self.op.pnode = self._ExpandNode(self.op.pnode)
4335
      nodelist = [self.op.pnode]
4336
      if self.op.snode is not None:
4337
        self.op.snode = self._ExpandNode(self.op.snode)
4338
        nodelist.append(self.op.snode)
4339
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4340

    
4341
    # in case of import lock the source node too
4342
    if self.op.mode == constants.INSTANCE_IMPORT:
4343
      src_node = getattr(self.op, "src_node", None)
4344
      src_path = getattr(self.op, "src_path", None)
4345

    
4346
      if src_path is None:
4347
        self.op.src_path = src_path = self.op.instance_name
4348

    
4349
      if src_node is None:
4350
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4351
        self.op.src_node = None
4352
        if os.path.isabs(src_path):
4353
          raise errors.OpPrereqError("Importing an instance from an absolute"
4354
                                     " path requires a source node option.")
4355
      else:
4356
        self.op.src_node = src_node = self._ExpandNode(src_node)
4357
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4358
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4359
        if not os.path.isabs(src_path):
4360
          self.op.src_path = src_path = \
4361
            os.path.join(constants.EXPORT_DIR, src_path)
4362

    
4363
    else: # INSTANCE_CREATE
4364
      if getattr(self.op, "os_type", None) is None:
4365
        raise errors.OpPrereqError("No guest OS specified")
4366

    
4367
  def _RunAllocator(self):
4368
    """Run the allocator based on input opcode.
4369

4370
    """
4371
    nics = [n.ToDict() for n in self.nics]
4372
    ial = IAllocator(self,
4373
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4374
                     name=self.op.instance_name,
4375
                     disk_template=self.op.disk_template,
4376
                     tags=[],
4377
                     os=self.op.os_type,
4378
                     vcpus=self.be_full[constants.BE_VCPUS],
4379
                     mem_size=self.be_full[constants.BE_MEMORY],
4380
                     disks=self.disks,
4381
                     nics=nics,
4382
                     hypervisor=self.op.hypervisor,
4383
                     )
4384

    
4385
    ial.Run(self.op.iallocator)
4386

    
4387
    if not ial.success:
4388
      raise errors.OpPrereqError("Can't compute nodes using"
4389
                                 " iallocator '%s': %s" % (self.op.iallocator,
4390
                                                           ial.info))
4391
    if len(ial.nodes) != ial.required_nodes:
4392
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4393
                                 " of nodes (%s), required %s" %
4394
                                 (self.op.iallocator, len(ial.nodes),
4395
                                  ial.required_nodes))
4396
    self.op.pnode = ial.nodes[0]
4397
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4398
                 self.op.instance_name, self.op.iallocator,
4399
                 ", ".join(ial.nodes))
4400
    if ial.required_nodes == 2:
4401
      self.op.snode = ial.nodes[1]
4402

    
4403
  def BuildHooksEnv(self):
4404
    """Build hooks env.
4405

4406
    This runs on master, primary and secondary nodes of the instance.
4407

4408
    """
4409
    env = {
4410
      "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
4411
      "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
4412
      "INSTANCE_ADD_MODE": self.op.mode,
4413
      }
4414
    if self.op.mode == constants.INSTANCE_IMPORT:
4415
      env["INSTANCE_SRC_NODE"] = self.op.src_node
4416
      env["INSTANCE_SRC_PATH"] = self.op.src_path
4417
      env["INSTANCE_SRC_IMAGES"] = self.src_images
4418

    
4419
    env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
4420
      primary_node=self.op.pnode,
4421
      secondary_nodes=self.secondaries,
4422
      status=self.op.start,
4423
      os_type=self.op.os_type,
4424
      memory=self.be_full[constants.BE_MEMORY],
4425
      vcpus=self.be_full[constants.BE_VCPUS],
4426
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4427
    ))
4428

    
4429
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4430
          self.secondaries)
4431
    return env, nl, nl
4432

    
4433

    
4434
  def CheckPrereq(self):
4435
    """Check prerequisites.
4436

4437
    """
4438
    if (not self.cfg.GetVGName() and
4439
        self.op.disk_template not in constants.DTS_NOT_LVM):
4440
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4441
                                 " instances")
4442

    
4443
    if self.op.mode == constants.INSTANCE_IMPORT:
4444
      src_node = self.op.src_node
4445
      src_path = self.op.src_path
4446

    
4447
      if src_node is None:
4448
        exp_list = self.rpc.call_export_list(
4449
          self.acquired_locks[locking.LEVEL_NODE])
4450
        found = False
4451
        for node in exp_list:
4452
          if not exp_list[node].failed and src_path in exp_list[node].data:
4453
            found = True
4454
            self.op.src_node = src_node = node
4455
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4456
                                                       src_path)
4457
            break
4458
        if not found:
4459
          raise errors.OpPrereqError("No export found for relative path %s" %
4460
                                      src_path)
4461

    
4462
      _CheckNodeOnline(self, src_node)
4463
      result = self.rpc.call_export_info(src_node, src_path)
4464
      result.Raise()
4465
      if not result.data:
4466
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4467

    
4468
      export_info = result.data
4469
      if not export_info.has_section(constants.INISECT_EXP):
4470
        raise errors.ProgrammerError("Corrupted export config")
4471

    
4472
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4473
      if (int(ei_version) != constants.EXPORT_VERSION):
4474
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4475
                                   (ei_version, constants.EXPORT_VERSION))
4476

    
4477
      # Check that the new instance doesn't have less disks than the export
4478
      instance_disks = len(self.disks)
4479
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4480
      if instance_disks < export_disks:
4481
        raise errors.OpPrereqError("Not enough disks to import."
4482
                                   " (instance: %d, export: %d)" %
4483
                                   (instance_disks, export_disks))
4484

    
4485
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4486
      disk_images = []
4487
      for idx in range(export_disks):
4488
        option = 'disk%d_dump' % idx
4489
        if export_info.has_option(constants.INISECT_INS, option):
4490
          # FIXME: are the old os-es, disk sizes, etc. useful?
4491
          export_name = export_info.get(constants.INISECT_INS, option)
4492
          image = os.path.join(src_path, export_name)
4493
          disk_images.append(image)
4494
        else:
4495
          disk_images.append(False)
4496

    
4497
      self.src_images = disk_images
4498

    
4499
      old_name = export_info.get(constants.INISECT_INS, 'name')
4500
      # FIXME: int() here could throw a ValueError on broken exports
4501
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4502
      if self.op.instance_name == old_name:
4503
        for idx, nic in enumerate(self.nics):
4504
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4505
            nic_mac_ini = 'nic%d_mac' % idx
4506
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4507

    
4508
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4509
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4510
    if self.op.start and not self.op.ip_check:
4511
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4512
                                 " adding an instance in start mode")
4513

    
4514
    if self.op.ip_check:
4515
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4516
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4517
                                   (self.check_ip, self.op.instance_name))
4518

    
4519
    #### mac address generation
4520
    # By generating here the mac address both the allocator and the hooks get
4521
    # the real final mac address rather than the 'auto' or 'generate' value.
4522
    # There is a race condition between the generation and the instance object
4523
    # creation, which means that we know the mac is valid now, but we're not
4524
    # sure it will be when we actually add the instance. If things go bad
4525
    # adding the instance will abort because of a duplicate mac, and the
4526
    # creation job will fail.
4527
    for nic in self.nics:
4528
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4529
        nic.mac = self.cfg.GenerateMAC()
4530

    
4531
    #### allocator run
4532

    
4533
    if self.op.iallocator is not None:
4534
      self._RunAllocator()
4535

    
4536
    #### node related checks
4537

    
4538
    # check primary node
4539
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4540
    assert self.pnode is not None, \
4541
      "Cannot retrieve locked node %s" % self.op.pnode
4542
    if pnode.offline:
4543
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4544
                                 pnode.name)
4545
    if pnode.drained:
4546
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4547
                                 pnode.name)
4548

    
4549
    self.secondaries = []
4550

    
4551
    # mirror node verification
4552
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4553
      if self.op.snode is None:
4554
        raise errors.OpPrereqError("The networked disk templates need"
4555
                                   " a mirror node")
4556
      if self.op.snode == pnode.name:
4557
        raise errors.OpPrereqError("The secondary node cannot be"
4558
                                   " the primary node.")
4559
      _CheckNodeOnline(self, self.op.snode)
4560
      _CheckNodeNotDrained(self, self.op.snode)
4561
      self.secondaries.append(self.op.snode)
4562

    
4563
    nodenames = [pnode.name] + self.secondaries
4564

    
4565
    req_size = _ComputeDiskSize(self.op.disk_template,
4566
                                self.disks)
4567

    
4568
    # Check lv size requirements
4569
    if req_size is not None:
4570
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4571
                                         self.op.hypervisor)
4572
      for node in nodenames:
4573
        info = nodeinfo[node]
4574
        info.Raise()
4575
        info = info.data
4576
        if not info:
4577
          raise errors.OpPrereqError("Cannot get current information"
4578
                                     " from node '%s'" % node)
4579
        vg_free = info.get('vg_free', None)
4580
        if not isinstance(vg_free, int):
4581
          raise errors.OpPrereqError("Can't compute free disk space on"
4582
                                     " node %s" % node)
4583
        if req_size > info['vg_free']:
4584
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4585
                                     " %d MB available, %d MB required" %
4586
                                     (node, info['vg_free'], req_size))
4587

    
4588
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4589

    
4590
    # os verification
4591
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4592
    result.Raise()
4593
    if not isinstance(result.data, objects.OS):
4594
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4595
                                 " primary node"  % self.op.os_type)
4596

    
4597
    # bridge check on primary node
4598
    bridges = [n.bridge for n in self.nics]
4599
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4600
    result.Raise()
4601
    if not result.data:
4602
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4603
                                 " exist on destination node '%s'" %
4604
                                 (",".join(bridges), pnode.name))
4605

    
4606
    # memory check on primary node
4607
    if self.op.start:
4608
      _CheckNodeFreeMemory(self, self.pnode.name,
4609
                           "creating instance %s" % self.op.instance_name,
4610
                           self.be_full[constants.BE_MEMORY],
4611
                           self.op.hypervisor)
4612

    
4613
  def Exec(self, feedback_fn):
4614
    """Create and add the instance to the cluster.
4615

4616
    """
4617
    instance = self.op.instance_name
4618
    pnode_name = self.pnode.name
4619

    
4620
    ht_kind = self.op.hypervisor
4621
    if ht_kind in constants.HTS_REQ_PORT:
4622
      network_port = self.cfg.AllocatePort()
4623
    else:
4624
      network_port = None
4625

    
4626
    ##if self.op.vnc_bind_address is None:
4627
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4628

    
4629
    # this is needed because os.path.join does not accept None arguments
4630
    if self.op.file_storage_dir is None:
4631
      string_file_storage_dir = ""
4632
    else:
4633
      string_file_storage_dir = self.op.file_storage_dir
4634

    
4635
    # build the full file storage dir path
4636
    file_storage_dir = os.path.normpath(os.path.join(
4637
                                        self.cfg.GetFileStorageDir(),
4638
                                        string_file_storage_dir, instance))
4639

    
4640

    
4641
    disks = _GenerateDiskTemplate(self,
4642
                                  self.op.disk_template,
4643
                                  instance, pnode_name,
4644
                                  self.secondaries,
4645
                                  self.disks,
4646
                                  file_storage_dir,
4647
                                  self.op.file_driver,
4648
                                  0)
4649

    
4650
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4651
                            primary_node=pnode_name,
4652
                            nics=self.nics, disks=disks,
4653
                            disk_template=self.op.disk_template,
4654
                            admin_up=False,
4655
                            network_port=network_port,
4656
                            beparams=self.op.beparams,
4657
                            hvparams=self.op.hvparams,
4658
                            hypervisor=self.op.hypervisor,
4659
                            )
4660

    
4661
    feedback_fn("* creating instance disks...")
4662
    try:
4663
      _CreateDisks(self, iobj)
4664
    except errors.OpExecError:
4665
      self.LogWarning("Device creation failed, reverting...")
4666
      try:
4667
        _RemoveDisks(self, iobj)
4668
      finally:
4669
        self.cfg.ReleaseDRBDMinors(instance)
4670
        raise
4671

    
4672
    feedback_fn("adding instance %s to cluster config" % instance)
4673

    
4674
    self.cfg.AddInstance(iobj)
4675
    # Declare that we don't want to remove the instance lock anymore, as we've
4676
    # added the instance to the config
4677
    del self.remove_locks[locking.LEVEL_INSTANCE]
4678
    # Unlock all the nodes
4679
    if self.op.mode == constants.INSTANCE_IMPORT:
4680
      nodes_keep = [self.op.src_node]
4681
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4682
                       if node != self.op.src_node]
4683
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4684
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4685
    else:
4686
      self.context.glm.release(locking.LEVEL_NODE)
4687
      del self.acquired_locks[locking.LEVEL_NODE]
4688

    
4689
    if self.op.wait_for_sync:
4690
      disk_abort = not _WaitForSync(self, iobj)
4691
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4692
      # make sure the disks are not degraded (still sync-ing is ok)
4693
      time.sleep(15)
4694
      feedback_fn("* checking mirrors status")
4695
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4696
    else:
4697
      disk_abort = False
4698

    
4699
    if disk_abort:
4700
      _RemoveDisks(self, iobj)
4701
      self.cfg.RemoveInstance(iobj.name)
4702
      # Make sure the instance lock gets removed
4703
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4704
      raise errors.OpExecError("There are some degraded disks for"
4705
                               " this instance")
4706

    
4707
    feedback_fn("creating os for instance %s on node %s" %
4708
                (instance, pnode_name))
4709

    
4710
    if iobj.disk_template != constants.DT_DISKLESS:
4711
      if self.op.mode == constants.INSTANCE_CREATE:
4712
        feedback_fn("* running the instance OS create scripts...")
4713
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4714
        msg = result.RemoteFailMsg()
4715
        if msg:
4716
          raise errors.OpExecError("Could not add os for instance %s"
4717
                                   " on node %s: %s" %
4718
                                   (instance, pnode_name, msg))
4719

    
4720
      elif self.op.mode == constants.INSTANCE_IMPORT:
4721
        feedback_fn("* running the instance OS import scripts...")
4722
        src_node = self.op.src_node
4723
        src_images = self.src_images
4724
        cluster_name = self.cfg.GetClusterName()
4725
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4726
                                                         src_node, src_images,
4727
                                                         cluster_name)
4728
        import_result.Raise()
4729
        for idx, result in enumerate(import_result.data):
4730
          if not result:
4731
            self.LogWarning("Could not import the image %s for instance"
4732
                            " %s, disk %d, on node %s" %
4733
                            (src_images[idx], instance, idx, pnode_name))
4734
      else:
4735
        # also checked in the prereq part
4736
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4737
                                     % self.op.mode)
4738

    
4739
    if self.op.start:
4740
      iobj.admin_up = True
4741
      self.cfg.Update(iobj)
4742
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4743
      feedback_fn("* starting instance...")
4744
      result = self.rpc.call_instance_start(pnode_name, iobj, None)
4745
      msg = result.RemoteFailMsg()
4746
      if msg:
4747
        raise errors.OpExecError("Could not start instance: %s" % msg)
4748

    
4749

    
4750
class LUConnectConsole(NoHooksLU):
4751
  """Connect to an instance's console.
4752

4753
  This is somewhat special in that it returns the command line that
4754
  you need to run on the master node in order to connect to the
4755
  console.
4756

4757
  """
4758
  _OP_REQP = ["instance_name"]
4759
  REQ_BGL = False
4760

    
4761
  def ExpandNames(self):
4762
    self._ExpandAndLockInstance()
4763

    
4764
  def CheckPrereq(self):
4765
    """Check prerequisites.
4766

4767
    This checks that the instance is in the cluster.
4768

4769
    """
4770
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4771
    assert self.instance is not None, \
4772
      "Cannot retrieve locked instance %s" % self.op.instance_name
4773
    _CheckNodeOnline(self, self.instance.primary_node)
4774

    
4775
  def Exec(self, feedback_fn):
4776
    """Connect to the console of an instance
4777

4778
    """
4779
    instance = self.instance
4780
    node = instance.primary_node
4781

    
4782
    node_insts = self.rpc.call_instance_list([node],
4783
                                             [instance.hypervisor])[node]
4784
    node_insts.Raise()
4785

    
4786
    if instance.name not in node_insts.data:
4787
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4788

    
4789
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4790

    
4791
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4792
    cluster = self.cfg.GetClusterInfo()
4793
    # beparams and hvparams are passed separately, to avoid editing the
4794
    # instance and then saving the defaults in the instance itself.
4795
    hvparams = cluster.FillHV(instance)
4796
    beparams = cluster.FillBE(instance)
4797
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4798

    
4799
    # build ssh cmdline
4800
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4801

    
4802

    
4803
class LUReplaceDisks(LogicalUnit):
4804
  """Replace the disks of an instance.
4805

4806
  """
4807
  HPATH = "mirrors-replace"
4808
  HTYPE = constants.HTYPE_INSTANCE
4809
  _OP_REQP = ["instance_name", "mode", "disks"]
4810
  REQ_BGL = False
4811

    
4812
  def CheckArguments(self):
4813
    if not hasattr(self.op, "remote_node"):
4814
      self.op.remote_node = None
4815
    if not hasattr(self.op, "iallocator"):
4816
      self.op.iallocator = None
4817

    
4818
    # check for valid parameter combination
4819
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4820
    if self.op.mode == constants.REPLACE_DISK_CHG:
4821
      if cnt == 2:
4822
        raise errors.OpPrereqError("When changing the secondary either an"
4823
                                   " iallocator script must be used or the"
4824
                                   " new node given")
4825
      elif cnt == 0:
4826
        raise errors.OpPrereqError("Give either the iallocator or the new"
4827
                                   " secondary, not both")
4828
    else: # not replacing the secondary
4829
      if cnt != 2:
4830
        raise errors.OpPrereqError("The iallocator and new node options can"
4831
                                   " be used only when changing the"
4832
                                   " secondary node")
4833

    
4834
  def ExpandNames(self):
4835
    self._ExpandAndLockInstance()
4836

    
4837
    if self.op.iallocator is not None:
4838
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4839
    elif self.op.remote_node is not None:
4840
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4841
      if remote_node is None:
4842
        raise errors.OpPrereqError("Node '%s' not known" %
4843
                                   self.op.remote_node)
4844
      self.op.remote_node = remote_node
4845
      # Warning: do not remove the locking of the new secondary here
4846
      # unless DRBD8.AddChildren is changed to work in parallel;
4847
      # currently it doesn't since parallel invocations of
4848
      # FindUnusedMinor will conflict
4849
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4850
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4851
    else:
4852
      self.needed_locks[locking.LEVEL_NODE] = []
4853
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4854

    
4855
  def DeclareLocks(self, level):
4856
    # If we're not already locking all nodes in the set we have to declare the
4857
    # instance's primary/secondary nodes.
4858
    if (level == locking.LEVEL_NODE and
4859
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4860
      self._LockInstancesNodes()
4861

    
4862
  def _RunAllocator(self):
4863
    """Compute a new secondary node using an IAllocator.
4864

4865
    """
4866
    ial = IAllocator(self,
4867
                     mode=constants.IALLOCATOR_MODE_RELOC,
4868
                     name=self.op.instance_name,
4869
                     relocate_from=[self.sec_node])
4870

    
4871
    ial.Run(self.op.iallocator)
4872

    
4873
    if not ial.success:
4874
      raise errors.OpPrereqError("Can't compute nodes using"
4875
                                 " iallocator '%s': %s" % (self.op.iallocator,
4876
                                                           ial.info))
4877
    if len(ial.nodes) != ial.required_nodes:
4878
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4879
                                 " of nodes (%s), required %s" %
4880
                                 (len(ial.nodes), ial.required_nodes))
4881
    self.op.remote_node = ial.nodes[0]
4882
    self.LogInfo("Selected new secondary for the instance: %s",
4883
                 self.op.remote_node)
4884

    
4885
  def BuildHooksEnv(self):
4886
    """Build hooks env.
4887

4888
    This runs on the master, the primary and all the secondaries.
4889

4890
    """
4891
    env = {
4892
      "MODE": self.op.mode,
4893
      "NEW_SECONDARY": self.op.remote_node,
4894
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4895
      }
4896
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4897
    nl = [
4898
      self.cfg.GetMasterNode(),
4899
      self.instance.primary_node,
4900
      ]
4901
    if self.op.remote_node is not None:
4902
      nl.append(self.op.remote_node)
4903
    return env, nl, nl
4904

    
4905
  def CheckPrereq(self):
4906
    """Check prerequisites.
4907

4908
    This checks that the instance is in the cluster.
4909

4910
    """
4911
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4912
    assert instance is not None, \
4913
      "Cannot retrieve locked instance %s" % self.op.instance_name
4914
    self.instance = instance
4915

    
4916
    if instance.disk_template != constants.DT_DRBD8:
4917
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4918
                                 " instances")
4919

    
4920
    if len(instance.secondary_nodes) != 1:
4921
      raise errors.OpPrereqError("The instance has a strange layout,"
4922
                                 " expected one secondary but found %d" %
4923
                                 len(instance.secondary_nodes))
4924

    
4925
    self.sec_node = instance.secondary_nodes[0]
4926

    
4927
    if self.op.iallocator is not None:
4928
      self._RunAllocator()
4929

    
4930
    remote_node = self.op.remote_node
4931
    if remote_node is not None:
4932
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4933
      assert self.remote_node_info is not None, \
4934
        "Cannot retrieve locked node %s" % remote_node
4935
    else:
4936
      self.remote_node_info = None
4937
    if remote_node == instance.primary_node:
4938
      raise errors.OpPrereqError("The specified node is the primary node of"
4939
                                 " the instance.")
4940
    elif remote_node == self.sec_node:
4941
      raise errors.OpPrereqError("The specified node is already the"
4942
                                 " secondary node of the instance.")
4943

    
4944
    if self.op.mode == constants.REPLACE_DISK_PRI:
4945
      n1 = self.tgt_node = instance.primary_node
4946
      n2 = self.oth_node = self.sec_node
4947
    elif self.op.mode == constants.REPLACE_DISK_SEC:
4948
      n1 = self.tgt_node = self.sec_node
4949
      n2 = self.oth_node = instance.primary_node
4950
    elif self.op.mode == constants.REPLACE_DISK_CHG:
4951
      n1 = self.new_node = remote_node
4952
      n2 = self.oth_node = instance.primary_node
4953
      self.tgt_node = self.sec_node
4954
      _CheckNodeNotDrained(self, remote_node)
4955
    else:
4956
      raise errors.ProgrammerError("Unhandled disk replace mode")
4957

    
4958
    _CheckNodeOnline(self, n1)
4959
    _CheckNodeOnline(self, n2)
4960

    
4961
    if not self.op.disks:
4962
      self.op.disks = range(len(instance.disks))
4963

    
4964
    for disk_idx in self.op.disks:
4965
      instance.FindDisk(disk_idx)
4966

    
4967
  def _ExecD8DiskOnly(self, feedback_fn):
4968
    """Replace a disk on the primary or secondary for dbrd8.
4969

4970
    The algorithm for replace is quite complicated:
4971

4972
      1. for each disk to be replaced:
4973

4974
        1. create new LVs on the target node with unique names
4975
        1. detach old LVs from the drbd device
4976
        1. rename old LVs to name_replaced.<time_t>
4977
        1. rename new LVs to old LVs
4978
        1. attach the new LVs (with the old names now) to the drbd device
4979

4980
      1. wait for sync across all devices
4981

4982
      1. for each modified disk:
4983

4984
        1. remove old LVs (which have the name name_replaces.<time_t>)
4985

4986
    Failures are not very well handled.
4987

4988
    """
4989
    steps_total = 6
4990
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4991
    instance = self.instance
4992
    iv_names = {}
4993
    vgname = self.cfg.GetVGName()
4994
    # start of work
4995
    cfg = self.cfg
4996
    tgt_node = self.tgt_node
4997
    oth_node = self.oth_node
4998

    
4999
    # Step: check device activation
5000
    self.proc.LogStep(1, steps_total, "check device existence")
5001
    info("checking volume groups")
5002
    my_vg = cfg.GetVGName()
5003
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5004
    if not results:
5005
      raise errors.OpExecError("Can't list volume groups on the nodes")
5006
    for node in oth_node, tgt_node:
5007
      res = results[node]
5008
      if res.failed or not res.data or my_vg not in res.data:
5009
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5010
                                 (my_vg, node))
5011
    for idx, dev in enumerate(instance.disks):
5012
      if idx not in self.op.disks:
5013
        continue
5014
      for node in tgt_node, oth_node:
5015
        info("checking disk/%d on %s" % (idx, node))
5016
        cfg.SetDiskID(dev, node)
5017
        result = self.rpc.call_blockdev_find(node, dev)
5018
        msg = result.RemoteFailMsg()
5019
        if not msg and not result.payload:
5020
          msg = "disk not found"
5021
        if msg:
5022
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5023
                                   (idx, node, msg))
5024

    
5025
    # Step: check other node consistency
5026
    self.proc.LogStep(2, steps_total, "check peer consistency")
5027
    for idx, dev in enumerate(instance.disks):
5028
      if idx not in self.op.disks:
5029
        continue
5030
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5031
      if not _CheckDiskConsistency(self, dev, oth_node,
5032
                                   oth_node==instance.primary_node):
5033
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5034
                                 " to replace disks on this node (%s)" %
5035
                                 (oth_node, tgt_node))
5036

    
5037
    # Step: create new storage
5038
    self.proc.LogStep(3, steps_total, "allocate new storage")
5039
    for idx, dev in enumerate(instance.disks):
5040
      if idx not in self.op.disks:
5041
        continue
5042
      size = dev.size
5043
      cfg.SetDiskID(dev, tgt_node)
5044
      lv_names = [".disk%d_%s" % (idx, suf)
5045
                  for suf in ["data", "meta"]]
5046
      names = _GenerateUniqueNames(self, lv_names)
5047
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5048
                             logical_id=(vgname, names[0]))
5049
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5050
                             logical_id=(vgname, names[1]))
5051
      new_lvs = [lv_data, lv_meta]
5052
      old_lvs = dev.children
5053
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5054
      info("creating new local storage on %s for %s" %
5055
           (tgt_node, dev.iv_name))
5056
      # we pass force_create=True to force the LVM creation
5057
      for new_lv in new_lvs:
5058
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5059
                        _GetInstanceInfoText(instance), False)
5060

    
5061
    # Step: for each lv, detach+rename*2+attach
5062
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5063
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5064
      info("detaching %s drbd from local storage" % dev.iv_name)
5065
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5066
      result.Raise()
5067
      if not result.data:
5068
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5069
                                 " %s for device %s" % (tgt_node, dev.iv_name))
5070
      #dev.children = []
5071
      #cfg.Update(instance)
5072

    
5073
      # ok, we created the new LVs, so now we know we have the needed
5074
      # storage; as such, we proceed on the target node to rename
5075
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5076
      # using the assumption that logical_id == physical_id (which in
5077
      # turn is the unique_id on that node)
5078

    
5079
      # FIXME(iustin): use a better name for the replaced LVs
5080
      temp_suffix = int(time.time())
5081
      ren_fn = lambda d, suff: (d.physical_id[0],
5082
                                d.physical_id[1] + "_replaced-%s" % suff)
5083
      # build the rename list based on what LVs exist on the node
5084
      rlist = []
5085
      for to_ren in old_lvs:
5086
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5087
        if not result.RemoteFailMsg() and result.payload:
5088
          # device exists
5089
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5090

    
5091
      info("renaming the old LVs on the target node")
5092
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5093
      result.Raise()
5094
      if not result.data:
5095
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5096
      # now we rename the new LVs to the old LVs
5097
      info("renaming the new LVs on the target node")
5098
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5099
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5100
      result.Raise()
5101
      if not result.data:
5102
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5103

    
5104
      for old, new in zip(old_lvs, new_lvs):
5105
        new.logical_id = old.logical_id
5106
        cfg.SetDiskID(new, tgt_node)
5107

    
5108
      for disk in old_lvs:
5109
        disk.logical_id = ren_fn(disk, temp_suffix)
5110
        cfg.SetDiskID(disk, tgt_node)
5111

    
5112
      # now that the new lvs have the old name, we can add them to the device
5113
      info("adding new mirror component on %s" % tgt_node)
5114
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5115
      if result.failed or not result.data:
5116
        for new_lv in new_lvs:
5117
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5118
          if msg:
5119
            warning("Can't rollback device %s: %s", dev, msg,
5120
                    hint="cleanup manually the unused logical volumes")
5121
        raise errors.OpExecError("Can't add local storage to drbd")
5122

    
5123
      dev.children = new_lvs
5124
      cfg.Update(instance)
5125

    
5126
    # Step: wait for sync
5127

    
5128
    # this can fail as the old devices are degraded and _WaitForSync
5129
    # does a combined result over all disks, so we don't check its
5130
    # return value
5131
    self.proc.LogStep(5, steps_total, "sync devices")
5132
    _WaitForSync(self, instance, unlock=True)
5133

    
5134
    # so check manually all the devices
5135
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5136
      cfg.SetDiskID(dev, instance.primary_node)
5137
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5138
      msg = result.RemoteFailMsg()
5139
      if not msg and not result.payload:
5140
        msg = "disk not found"
5141
      if msg:
5142
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5143
                                 (name, msg))
5144
      if result.payload[5]:
5145
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5146

    
5147
    # Step: remove old storage
5148
    self.proc.LogStep(6, steps_total, "removing old storage")
5149
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5150
      info("remove logical volumes for %s" % name)
5151
      for lv in old_lvs:
5152
        cfg.SetDiskID(lv, tgt_node)
5153
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5154
        if msg:
5155
          warning("Can't remove old LV: %s" % msg,
5156
                  hint="manually remove unused LVs")
5157
          continue
5158

    
5159
  def _ExecD8Secondary(self, feedback_fn):
5160
    """Replace the secondary node for drbd8.
5161

5162
    The algorithm for replace is quite complicated:
5163
      - for all disks of the instance:
5164
        - create new LVs on the new node with same names
5165
        - shutdown the drbd device on the old secondary
5166
        - disconnect the drbd network on the primary
5167
        - create the drbd device on the new secondary
5168
        - network attach the drbd on the primary, using an artifice:
5169
          the drbd code for Attach() will connect to the network if it
5170
          finds a device which is connected to the good local disks but
5171
          not network enabled
5172
      - wait for sync across all devices
5173
      - remove all disks from the old secondary
5174

5175
    Failures are not very well handled.
5176

5177
    """
5178
    steps_total = 6
5179
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5180
    instance = self.instance
5181
    iv_names = {}
5182
    # start of work
5183
    cfg = self.cfg
5184
    old_node = self.tgt_node
5185
    new_node = self.new_node
5186
    pri_node = instance.primary_node
5187
    nodes_ip = {
5188
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5189
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5190
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5191
      }
5192

    
5193
    # Step: check device activation
5194
    self.proc.LogStep(1, steps_total, "check device existence")
5195
    info("checking volume groups")
5196
    my_vg = cfg.GetVGName()
5197
    results = self.rpc.call_vg_list([pri_node, new_node])
5198
    for node in pri_node, new_node:
5199
      res = results[node]
5200
      if res.failed or not res.data or my_vg not in res.data:
5201
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5202
                                 (my_vg, node))
5203
    for idx, dev in enumerate(instance.disks):
5204
      if idx not in self.op.disks:
5205
        continue
5206
      info("checking disk/%d on %s" % (idx, pri_node))
5207
      cfg.SetDiskID(dev, pri_node)
5208
      result = self.rpc.call_blockdev_find(pri_node, dev)
5209
      msg = result.RemoteFailMsg()
5210
      if not msg and not result.payload:
5211
        msg = "disk not found"
5212
      if msg:
5213
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5214
                                 (idx, pri_node, msg))
5215

    
5216
    # Step: check other node consistency
5217
    self.proc.LogStep(2, steps_total, "check peer consistency")
5218
    for idx, dev in enumerate(instance.disks):
5219
      if idx not in self.op.disks:
5220
        continue
5221
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5222
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5223
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5224
                                 " unsafe to replace the secondary" %
5225
                                 pri_node)
5226

    
5227
    # Step: create new storage
5228
    self.proc.LogStep(3, steps_total, "allocate new storage")
5229
    for idx, dev in enumerate(instance.disks):
5230
      info("adding new local storage on %s for disk/%d" %
5231
           (new_node, idx))
5232
      # we pass force_create=True to force LVM creation
5233
      for new_lv in dev.children:
5234
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5235
                        _GetInstanceInfoText(instance), False)
5236

    
5237
    # Step 4: dbrd minors and drbd setups changes
5238
    # after this, we must manually remove the drbd minors on both the
5239
    # error and the success paths
5240
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5241
                                   instance.name)
5242
    logging.debug("Allocated minors %s" % (minors,))
5243
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5244
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5245
      size = dev.size
5246
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5247
      # create new devices on new_node; note that we create two IDs:
5248
      # one without port, so the drbd will be activated without
5249
      # networking information on the new node at this stage, and one
5250
      # with network, for the latter activation in step 4
5251
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5252
      if pri_node == o_node1:
5253
        p_minor = o_minor1
5254
      else:
5255
        p_minor = o_minor2
5256

    
5257
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5258
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5259

    
5260
      iv_names[idx] = (dev, dev.children, new_net_id)
5261
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5262
                    new_net_id)
5263
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5264
                              logical_id=new_alone_id,
5265
                              children=dev.children)
5266
      try:
5267
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5268
                              _GetInstanceInfoText(instance), False)
5269
      except errors.BlockDeviceError:
5270
        self.cfg.ReleaseDRBDMinors(instance.name)
5271
        raise
5272

    
5273
    for idx, dev in enumerate(instance.disks):
5274
      # we have new devices, shutdown the drbd on the old secondary
5275
      info("shutting down drbd for disk/%d on old node" % idx)
5276
      cfg.SetDiskID(dev, old_node)
5277
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5278
      if msg:
5279
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5280
                (idx, msg),
5281
                hint="Please cleanup this device manually as soon as possible")
5282

    
5283
    info("detaching primary drbds from the network (=> standalone)")
5284
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5285
                                               instance.disks)[pri_node]
5286

    
5287
    msg = result.RemoteFailMsg()
5288
    if msg:
5289
      # detaches didn't succeed (unlikely)
5290
      self.cfg.ReleaseDRBDMinors(instance.name)
5291
      raise errors.OpExecError("Can't detach the disks from the network on"
5292
                               " old node: %s" % (msg,))
5293

    
5294
    # if we managed to detach at least one, we update all the disks of
5295
    # the instance to point to the new secondary
5296
    info("updating instance configuration")
5297
    for dev, _, new_logical_id in iv_names.itervalues():
5298
      dev.logical_id = new_logical_id
5299
      cfg.SetDiskID(dev, pri_node)
5300
    cfg.Update(instance)
5301

    
5302
    # and now perform the drbd attach
5303
    info("attaching primary drbds to new secondary (standalone => connected)")
5304
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5305
                                           instance.disks, instance.name,
5306
                                           False)
5307
    for to_node, to_result in result.items():
5308
      msg = to_result.RemoteFailMsg()
5309
      if msg:
5310
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5311
                hint="please do a gnt-instance info to see the"
5312
                " status of disks")
5313

    
5314
    # this can fail as the old devices are degraded and _WaitForSync
5315
    # does a combined result over all disks, so we don't check its
5316
    # return value
5317
    self.proc.LogStep(5, steps_total, "sync devices")
5318
    _WaitForSync(self, instance, unlock=True)
5319

    
5320
    # so check manually all the devices
5321
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5322
      cfg.SetDiskID(dev, pri_node)
5323
      result = self.rpc.call_blockdev_find(pri_node, dev)
5324
      msg = result.RemoteFailMsg()
5325
      if not msg and not result.payload:
5326
        msg = "disk not found"
5327
      if msg:
5328
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5329
                                 (idx, msg))
5330
      if result.payload[5]:
5331
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5332

    
5333
    self.proc.LogStep(6, steps_total, "removing old storage")
5334
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5335
      info("remove logical volumes for disk/%d" % idx)
5336
      for lv in old_lvs:
5337
        cfg.SetDiskID(lv, old_node)
5338
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5339
        if msg:
5340
          warning("Can't remove LV on old secondary: %s", msg,
5341
                  hint="Cleanup stale volumes by hand")
5342

    
5343
  def Exec(self, feedback_fn):
5344
    """Execute disk replacement.
5345

5346
    This dispatches the disk replacement to the appropriate handler.
5347

5348
    """
5349
    instance = self.instance
5350

    
5351
    # Activate the instance disks if we're replacing them on a down instance
5352
    if not instance.admin_up:
5353
      _StartInstanceDisks(self, instance, True)
5354

    
5355
    if self.op.mode == constants.REPLACE_DISK_CHG:
5356
      fn = self._ExecD8Secondary
5357
    else:
5358
      fn = self._ExecD8DiskOnly
5359

    
5360
    ret = fn(feedback_fn)
5361

    
5362
    # Deactivate the instance disks if we're replacing them on a down instance
5363
    if not instance.admin_up:
5364
      _SafeShutdownInstanceDisks(self, instance)
5365

    
5366
    return ret
5367

    
5368

    
5369
class LUGrowDisk(LogicalUnit):
5370
  """Grow a disk of an instance.
5371

5372
  """
5373
  HPATH = "disk-grow"
5374
  HTYPE = constants.HTYPE_INSTANCE
5375
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5376
  REQ_BGL = False
5377

    
5378
  def ExpandNames(self):
5379
    self._ExpandAndLockInstance()
5380
    self.needed_locks[locking.LEVEL_NODE] = []
5381
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5382

    
5383
  def DeclareLocks(self, level):
5384
    if level == locking.LEVEL_NODE:
5385
      self._LockInstancesNodes()
5386

    
5387
  def BuildHooksEnv(self):
5388
    """Build hooks env.
5389

5390
    This runs on the master, the primary and all the secondaries.
5391

5392
    """
5393
    env = {
5394
      "DISK": self.op.disk,
5395
      "AMOUNT": self.op.amount,
5396
      }
5397
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5398
    nl = [
5399
      self.cfg.GetMasterNode(),
5400
      self.instance.primary_node,
5401
      ]
5402
    return env, nl, nl
5403

    
5404
  def CheckPrereq(self):
5405
    """Check prerequisites.
5406

5407
    This checks that the instance is in the cluster.
5408

5409
    """
5410
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5411
    assert instance is not None, \
5412
      "Cannot retrieve locked instance %s" % self.op.instance_name
5413
    nodenames = list(instance.all_nodes)
5414
    for node in nodenames:
5415
      _CheckNodeOnline(self, node)
5416

    
5417

    
5418
    self.instance = instance
5419

    
5420
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5421
      raise errors.OpPrereqError("Instance's disk layout does not support"
5422
                                 " growing.")
5423

    
5424
    self.disk = instance.FindDisk(self.op.disk)
5425

    
5426
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5427
                                       instance.hypervisor)
5428
    for node in nodenames:
5429
      info = nodeinfo[node]
5430
      if info.failed or not info.data:
5431
        raise errors.OpPrereqError("Cannot get current information"
5432
                                   " from node '%s'" % node)
5433
      vg_free = info.data.get('vg_free', None)
5434
      if not isinstance(vg_free, int):
5435
        raise errors.OpPrereqError("Can't compute free disk space on"
5436
                                   " node %s" % node)
5437
      if self.op.amount > vg_free:
5438
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5439
                                   " %d MiB available, %d MiB required" %
5440
                                   (node, vg_free, self.op.amount))
5441

    
5442
  def Exec(self, feedback_fn):
5443
    """Execute disk grow.
5444

5445
    """
5446
    instance = self.instance
5447
    disk = self.disk
5448
    for node in instance.all_nodes:
5449
      self.cfg.SetDiskID(disk, node)
5450
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5451
      msg = result.RemoteFailMsg()
5452
      if msg:
5453
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5454
                                 (node, msg))
5455
    disk.RecordGrow(self.op.amount)
5456
    self.cfg.Update(instance)
5457
    if self.op.wait_for_sync:
5458
      disk_abort = not _WaitForSync(self, instance)
5459
      if disk_abort:
5460
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5461
                             " status.\nPlease check the instance.")
5462

    
5463

    
5464
class LUQueryInstanceData(NoHooksLU):
5465
  """Query runtime instance data.
5466

5467
  """
5468
  _OP_REQP = ["instances", "static"]
5469
  REQ_BGL = False
5470

    
5471
  def ExpandNames(self):
5472
    self.needed_locks = {}
5473
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5474

    
5475
    if not isinstance(self.op.instances, list):
5476
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5477

    
5478
    if self.op.instances:
5479
      self.wanted_names = []
5480
      for name in self.op.instances:
5481
        full_name = self.cfg.ExpandInstanceName(name)
5482
        if full_name is None:
5483
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5484
        self.wanted_names.append(full_name)
5485
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5486
    else:
5487
      self.wanted_names = None
5488
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5489

    
5490
    self.needed_locks[locking.LEVEL_NODE] = []
5491
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5492

    
5493
  def DeclareLocks(self, level):
5494
    if level == locking.LEVEL_NODE:
5495
      self._LockInstancesNodes()
5496

    
5497
  def CheckPrereq(self):
5498
    """Check prerequisites.
5499

5500
    This only checks the optional instance list against the existing names.
5501

5502
    """
5503
    if self.wanted_names is None:
5504
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5505

    
5506
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5507
                             in self.wanted_names]
5508
    return
5509

    
5510
  def _ComputeDiskStatus(self, instance, snode, dev):
5511
    """Compute block device status.
5512

5513
    """
5514
    static = self.op.static
5515
    if not static:
5516
      self.cfg.SetDiskID(dev, instance.primary_node)
5517
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5518
      if dev_pstatus.offline:
5519
        dev_pstatus = None
5520
      else:
5521
        msg = dev_pstatus.RemoteFailMsg()
5522
        if msg:
5523
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5524
                                   (instance.name, msg))
5525
        dev_pstatus = dev_pstatus.payload
5526
    else:
5527
      dev_pstatus = None
5528

    
5529
    if dev.dev_type in constants.LDS_DRBD:
5530
      # we change the snode then (otherwise we use the one passed in)
5531
      if dev.logical_id[0] == instance.primary_node:
5532
        snode = dev.logical_id[1]
5533
      else:
5534
        snode = dev.logical_id[0]
5535

    
5536
    if snode and not static:
5537
      self.cfg.SetDiskID(dev, snode)
5538
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5539
      if dev_sstatus.offline:
5540
        dev_sstatus = None
5541
      else:
5542
        msg = dev_sstatus.RemoteFailMsg()
5543
        if msg:
5544
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5545
                                   (instance.name, msg))
5546
        dev_sstatus = dev_sstatus.payload
5547
    else:
5548
      dev_sstatus = None
5549

    
5550
    if dev.children:
5551
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5552
                      for child in dev.children]
5553
    else:
5554
      dev_children = []
5555

    
5556
    data = {
5557
      "iv_name": dev.iv_name,
5558
      "dev_type": dev.dev_type,
5559
      "logical_id": dev.logical_id,
5560
      "physical_id": dev.physical_id,
5561
      "pstatus": dev_pstatus,
5562
      "sstatus": dev_sstatus,
5563
      "children": dev_children,
5564
      "mode": dev.mode,
5565
      }
5566

    
5567
    return data
5568

    
5569
  def Exec(self, feedback_fn):
5570
    """Gather and return data"""
5571
    result = {}
5572

    
5573
    cluster = self.cfg.GetClusterInfo()
5574

    
5575
    for instance in self.wanted_instances:
5576
      if not self.op.static:
5577
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5578
                                                  instance.name,
5579
                                                  instance.hypervisor)
5580
        remote_info.Raise()
5581
        remote_info = remote_info.data
5582
        if remote_info and "state" in remote_info:
5583
          remote_state = "up"
5584
        else:
5585
          remote_state = "down"
5586
      else:
5587
        remote_state = None
5588
      if instance.admin_up:
5589
        config_state = "up"
5590
      else:
5591
        config_state = "down"
5592

    
5593
      disks = [self._ComputeDiskStatus(instance, None, device)
5594
               for device in instance.disks]
5595

    
5596
      idict = {
5597
        "name": instance.name,
5598
        "config_state": config_state,
5599
        "run_state": remote_state,
5600
        "pnode": instance.primary_node,
5601
        "snodes": instance.secondary_nodes,
5602
        "os": instance.os,
5603
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5604
        "disks": disks,
5605
        "hypervisor": instance.hypervisor,
5606
        "network_port": instance.network_port,
5607
        "hv_instance": instance.hvparams,
5608
        "hv_actual": cluster.FillHV(instance),
5609
        "be_instance": instance.beparams,
5610
        "be_actual": cluster.FillBE(instance),
5611
        }
5612

    
5613
      result[instance.name] = idict
5614

    
5615
    return result
5616

    
5617

    
5618
class LUSetInstanceParams(LogicalUnit):
5619
  """Modifies an instances's parameters.
5620

5621
  """
5622
  HPATH = "instance-modify"
5623
  HTYPE = constants.HTYPE_INSTANCE
5624
  _OP_REQP = ["instance_name"]
5625
  REQ_BGL = False
5626

    
5627
  def CheckArguments(self):
5628
    if not hasattr(self.op, 'nics'):
5629
      self.op.nics = []
5630
    if not hasattr(self.op, 'disks'):
5631
      self.op.disks = []
5632
    if not hasattr(self.op, 'beparams'):
5633
      self.op.beparams = {}
5634
    if not hasattr(self.op, 'hvparams'):
5635
      self.op.hvparams = {}
5636
    self.op.force = getattr(self.op, "force", False)
5637
    if not (self.op.nics or self.op.disks or
5638
            self.op.hvparams or self.op.beparams):
5639
      raise errors.OpPrereqError("No changes submitted")
5640

    
5641
    # Disk validation
5642
    disk_addremove = 0
5643
    for disk_op, disk_dict in self.op.disks:
5644
      if disk_op == constants.DDM_REMOVE:
5645
        disk_addremove += 1
5646
        continue
5647
      elif disk_op == constants.DDM_ADD:
5648
        disk_addremove += 1
5649
      else:
5650
        if not isinstance(disk_op, int):
5651
          raise errors.OpPrereqError("Invalid disk index")
5652
      if disk_op == constants.DDM_ADD:
5653
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5654
        if mode not in constants.DISK_ACCESS_SET:
5655
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5656
        size = disk_dict.get('size', None)
5657
        if size is None:
5658
          raise errors.OpPrereqError("Required disk parameter size missing")
5659
        try:
5660
          size = int(size)
5661
        except ValueError, err:
5662
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5663
                                     str(err))
5664
        disk_dict['size'] = size
5665
      else:
5666
        # modification of disk
5667
        if 'size' in disk_dict:
5668
          raise errors.OpPrereqError("Disk size change not possible, use"
5669
                                     " grow-disk")
5670

    
5671
    if disk_addremove > 1:
5672
      raise errors.OpPrereqError("Only one disk add or remove operation"
5673
                                 " supported at a time")
5674

    
5675
    # NIC validation
5676
    nic_addremove = 0
5677
    for nic_op, nic_dict in self.op.nics:
5678
      if nic_op == constants.DDM_REMOVE:
5679
        nic_addremove += 1
5680
        continue
5681
      elif nic_op == constants.DDM_ADD:
5682
        nic_addremove += 1
5683
      else:
5684
        if not isinstance(nic_op, int):
5685
          raise errors.OpPrereqError("Invalid nic index")
5686

    
5687
      # nic_dict should be a dict
5688
      nic_ip = nic_dict.get('ip', None)
5689
      if nic_ip is not None:
5690
        if nic_ip.lower() == constants.VALUE_NONE:
5691
          nic_dict['ip'] = None
5692
        else:
5693
          if not utils.IsValidIP(nic_ip):
5694
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5695

    
5696
      if nic_op == constants.DDM_ADD:
5697
        nic_bridge = nic_dict.get('bridge', None)
5698
        if nic_bridge is None:
5699
          nic_dict['bridge'] = self.cfg.GetDefBridge()
5700
        nic_mac = nic_dict.get('mac', None)
5701
        if nic_mac is None:
5702
          nic_dict['mac'] = constants.VALUE_AUTO
5703

    
5704
      if 'mac' in nic_dict:
5705
        nic_mac = nic_dict['mac']
5706
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5707
          if not utils.IsValidMac(nic_mac):
5708
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5709
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5710
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5711
                                     " modifying an existing nic")
5712

    
5713
    if nic_addremove > 1:
5714
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5715
                                 " supported at a time")
5716

    
5717
  def ExpandNames(self):
5718
    self._ExpandAndLockInstance()
5719
    self.needed_locks[locking.LEVEL_NODE] = []
5720
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5721

    
5722
  def DeclareLocks(self, level):
5723
    if level == locking.LEVEL_NODE:
5724
      self._LockInstancesNodes()
5725

    
5726
  def BuildHooksEnv(self):
5727
    """Build hooks env.
5728

5729
    This runs on the master, primary and secondaries.
5730

5731
    """
5732
    args = dict()
5733
    if constants.BE_MEMORY in self.be_new:
5734
      args['memory'] = self.be_new[constants.BE_MEMORY]
5735
    if constants.BE_VCPUS in self.be_new:
5736
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5737
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5738
    # information at all.
5739
    if self.op.nics:
5740
      args['nics'] = []
5741
      nic_override = dict(self.op.nics)
5742
      for idx, nic in enumerate(self.instance.nics):
5743
        if idx in nic_override:
5744
          this_nic_override = nic_override[idx]
5745
        else:
5746
          this_nic_override = {}
5747
        if 'ip' in this_nic_override:
5748
          ip = this_nic_override['ip']
5749
        else:
5750
          ip = nic.ip
5751
        if 'bridge' in this_nic_override:
5752
          bridge = this_nic_override['bridge']
5753
        else:
5754
          bridge = nic.bridge
5755
        if 'mac' in this_nic_override:
5756
          mac = this_nic_override['mac']
5757
        else:
5758
          mac = nic.mac
5759
        args['nics'].append((ip, bridge, mac))
5760
      if constants.DDM_ADD in nic_override:
5761
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5762
        bridge = nic_override[constants.DDM_ADD]['bridge']
5763
        mac = nic_override[constants.DDM_ADD]['mac']
5764
        args['nics'].append((ip, bridge, mac))
5765
      elif constants.DDM_REMOVE in nic_override:
5766
        del args['nics'][-1]
5767

    
5768
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5769
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5770
    return env, nl, nl
5771

    
5772
  def CheckPrereq(self):
5773
    """Check prerequisites.
5774

5775
    This only checks the instance list against the existing names.
5776

5777
    """
5778
    force = self.force = self.op.force
5779

    
5780
    # checking the new params on the primary/secondary nodes
5781

    
5782
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5783
    assert self.instance is not None, \
5784
      "Cannot retrieve locked instance %s" % self.op.instance_name
5785
    pnode = instance.primary_node
5786
    nodelist = list(instance.all_nodes)
5787

    
5788
    # hvparams processing
5789
    if self.op.hvparams:
5790
      i_hvdict = copy.deepcopy(instance.hvparams)
5791
      for key, val in self.op.hvparams.iteritems():
5792
        if val == constants.VALUE_DEFAULT:
5793
          try:
5794
            del i_hvdict[key]
5795
          except KeyError:
5796
            pass
5797
        else:
5798
          i_hvdict[key] = val
5799
      cluster = self.cfg.GetClusterInfo()
5800
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5801
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5802
                                i_hvdict)
5803
      # local check
5804
      hypervisor.GetHypervisor(
5805
        instance.hypervisor).CheckParameterSyntax(hv_new)
5806
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5807
      self.hv_new = hv_new # the new actual values
5808
      self.hv_inst = i_hvdict # the new dict (without defaults)
5809
    else:
5810
      self.hv_new = self.hv_inst = {}
5811

    
5812
    # beparams processing
5813
    if self.op.beparams:
5814
      i_bedict = copy.deepcopy(instance.beparams)
5815
      for key, val in self.op.beparams.iteritems():
5816
        if val == constants.VALUE_DEFAULT:
5817
          try:
5818
            del i_bedict[key]
5819
          except KeyError:
5820
            pass
5821
        else:
5822
          i_bedict[key] = val
5823
      cluster = self.cfg.GetClusterInfo()
5824
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5825
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5826
                                i_bedict)
5827
      self.be_new = be_new # the new actual values
5828
      self.be_inst = i_bedict # the new dict (without defaults)
5829
    else:
5830
      self.be_new = self.be_inst = {}
5831

    
5832
    self.warn = []
5833

    
5834
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5835
      mem_check_list = [pnode]
5836
      if be_new[constants.BE_AUTO_BALANCE]:
5837
        # either we changed auto_balance to yes or it was from before
5838
        mem_check_list.extend(instance.secondary_nodes)
5839
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5840
                                                  instance.hypervisor)
5841
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5842
                                         instance.hypervisor)
5843
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5844
        # Assume the primary node is unreachable and go ahead
5845
        self.warn.append("Can't get info from primary node %s" % pnode)
5846
      else:
5847
        if not instance_info.failed and instance_info.data:
5848
          current_mem = instance_info.data['memory']
5849
        else:
5850
          # Assume instance not running
5851
          # (there is a slight race condition here, but it's not very probable,
5852
          # and we have no other way to check)
5853
          current_mem = 0
5854
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5855
                    nodeinfo[pnode].data['memory_free'])
5856
        if miss_mem > 0:
5857
          raise errors.OpPrereqError("This change will prevent the instance"
5858
                                     " from starting, due to %d MB of memory"
5859
                                     " missing on its primary node" % miss_mem)
5860

    
5861
      if be_new[constants.BE_AUTO_BALANCE]:
5862
        for node, nres in nodeinfo.iteritems():
5863
          if node not in instance.secondary_nodes:
5864
            continue
5865
          if nres.failed or not isinstance(nres.data, dict):
5866
            self.warn.append("Can't get info from secondary node %s" % node)
5867
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5868
            self.warn.append("Not enough memory to failover instance to"
5869
                             " secondary node %s" % node)
5870

    
5871
    # NIC processing
5872
    for nic_op, nic_dict in self.op.nics:
5873
      if nic_op == constants.DDM_REMOVE:
5874
        if not instance.nics:
5875
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5876
        continue
5877
      if nic_op != constants.DDM_ADD:
5878
        # an existing nic
5879
        if nic_op < 0 or nic_op >= len(instance.nics):
5880
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5881
                                     " are 0 to %d" %
5882
                                     (nic_op, len(instance.nics)))
5883
      if 'bridge' in nic_dict:
5884
        nic_bridge = nic_dict['bridge']
5885
        if nic_bridge is None:
5886
          raise errors.OpPrereqError('Cannot set the nic bridge to None')
5887
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5888
          msg = ("Bridge '%s' doesn't exist on one of"
5889
                 " the instance nodes" % nic_bridge)
5890
          if self.force:
5891
            self.warn.append(msg)
5892
          else:
5893
            raise errors.OpPrereqError(msg)
5894
      if 'mac' in nic_dict:
5895
        nic_mac = nic_dict['mac']
5896
        if nic_mac is None:
5897
          raise errors.OpPrereqError('Cannot set the nic mac to None')
5898
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5899
          # otherwise generate the mac
5900
          nic_dict['mac'] = self.cfg.GenerateMAC()
5901
        else:
5902
          # or validate/reserve the current one
5903
          if self.cfg.IsMacInUse(nic_mac):
5904
            raise errors.OpPrereqError("MAC address %s already in use"
5905
                                       " in cluster" % nic_mac)
5906

    
5907
    # DISK processing
5908
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5909
      raise errors.OpPrereqError("Disk operations not supported for"
5910
                                 " diskless instances")
5911
    for disk_op, disk_dict in self.op.disks:
5912
      if disk_op == constants.DDM_REMOVE:
5913
        if len(instance.disks) == 1:
5914
          raise errors.OpPrereqError("Cannot remove the last disk of"
5915
                                     " an instance")
5916
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5917
        ins_l = ins_l[pnode]
5918
        if ins_l.failed or not isinstance(ins_l.data, list):
5919
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5920
        if instance.name in ins_l.data:
5921
          raise errors.OpPrereqError("Instance is running, can't remove"
5922
                                     " disks.")
5923

    
5924
      if (disk_op == constants.DDM_ADD and
5925
          len(instance.nics) >= constants.MAX_DISKS):
5926
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5927
                                   " add more" % constants.MAX_DISKS)
5928
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5929
        # an existing disk
5930
        if disk_op < 0 or disk_op >= len(instance.disks):
5931
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5932
                                     " are 0 to %d" %
5933
                                     (disk_op, len(instance.disks)))
5934

    
5935
    return
5936

    
5937
  def Exec(self, feedback_fn):
5938
    """Modifies an instance.
5939

5940
    All parameters take effect only at the next restart of the instance.
5941

5942
    """
5943
    # Process here the warnings from CheckPrereq, as we don't have a
5944
    # feedback_fn there.
5945
    for warn in self.warn:
5946
      feedback_fn("WARNING: %s" % warn)
5947

    
5948
    result = []
5949
    instance = self.instance
5950
    # disk changes
5951
    for disk_op, disk_dict in self.op.disks:
5952
      if disk_op == constants.DDM_REMOVE:
5953
        # remove the last disk
5954
        device = instance.disks.pop()
5955
        device_idx = len(instance.disks)
5956
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5957
          self.cfg.SetDiskID(disk, node)
5958
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5959
          if msg:
5960
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
5961
                            " continuing anyway", device_idx, node, msg)
5962
        result.append(("disk/%d" % device_idx, "remove"))
5963
      elif disk_op == constants.DDM_ADD:
5964
        # add a new disk
5965
        if instance.disk_template == constants.DT_FILE:
5966
          file_driver, file_path = instance.disks[0].logical_id
5967
          file_path = os.path.dirname(file_path)
5968
        else:
5969
          file_driver = file_path = None
5970
        disk_idx_base = len(instance.disks)
5971
        new_disk = _GenerateDiskTemplate(self,
5972
                                         instance.disk_template,
5973
                                         instance.name, instance.primary_node,
5974
                                         instance.secondary_nodes,
5975
                                         [disk_dict],
5976
                                         file_path,
5977
                                         file_driver,
5978
                                         disk_idx_base)[0]
5979
        instance.disks.append(new_disk)
5980
        info = _GetInstanceInfoText(instance)
5981

    
5982
        logging.info("Creating volume %s for instance %s",
5983
                     new_disk.iv_name, instance.name)
5984
        # Note: this needs to be kept in sync with _CreateDisks
5985
        #HARDCODE
5986
        for node in instance.all_nodes:
5987
          f_create = node == instance.primary_node
5988
          try:
5989
            _CreateBlockDev(self, node, instance, new_disk,
5990
                            f_create, info, f_create)
5991
          except errors.OpExecError, err:
5992
            self.LogWarning("Failed to create volume %s (%s) on"
5993
                            " node %s: %s",
5994
                            new_disk.iv_name, new_disk, node, err)
5995
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5996
                       (new_disk.size, new_disk.mode)))
5997
      else:
5998
        # change a given disk
5999
        instance.disks[disk_op].mode = disk_dict['mode']
6000
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6001
    # NIC changes
6002
    for nic_op, nic_dict in self.op.nics:
6003
      if nic_op == constants.DDM_REMOVE:
6004
        # remove the last nic
6005
        del instance.nics[-1]
6006
        result.append(("nic.%d" % len(instance.nics), "remove"))
6007
      elif nic_op == constants.DDM_ADD:
6008
        # mac and bridge should be set, by now
6009
        mac = nic_dict['mac']
6010
        bridge = nic_dict['bridge']
6011
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6012
                              bridge=bridge)
6013
        instance.nics.append(new_nic)
6014
        result.append(("nic.%d" % (len(instance.nics) - 1),
6015
                       "add:mac=%s,ip=%s,bridge=%s" %
6016
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
6017
      else:
6018
        # change a given nic
6019
        for key in 'mac', 'ip', 'bridge':
6020
          if key in nic_dict:
6021
            setattr(instance.nics[nic_op], key, nic_dict[key])
6022
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6023

    
6024
    # hvparams changes
6025
    if self.op.hvparams:
6026
      instance.hvparams = self.hv_inst
6027
      for key, val in self.op.hvparams.iteritems():
6028
        result.append(("hv/%s" % key, val))
6029

    
6030
    # beparams changes
6031
    if self.op.beparams:
6032
      instance.beparams = self.be_inst
6033
      for key, val in self.op.beparams.iteritems():
6034
        result.append(("be/%s" % key, val))
6035

    
6036
    self.cfg.Update(instance)
6037

    
6038
    return result
6039

    
6040

    
6041
class LUQueryExports(NoHooksLU):
6042
  """Query the exports list
6043

6044
  """
6045
  _OP_REQP = ['nodes']
6046
  REQ_BGL = False
6047

    
6048
  def ExpandNames(self):
6049
    self.needed_locks = {}
6050
    self.share_locks[locking.LEVEL_NODE] = 1
6051
    if not self.op.nodes:
6052
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6053
    else:
6054
      self.needed_locks[locking.LEVEL_NODE] = \
6055
        _GetWantedNodes(self, self.op.nodes)
6056

    
6057
  def CheckPrereq(self):
6058
    """Check prerequisites.
6059

6060
    """
6061
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6062

    
6063
  def Exec(self, feedback_fn):
6064
    """Compute the list of all the exported system images.
6065

6066
    @rtype: dict
6067
    @return: a dictionary with the structure node->(export-list)
6068
        where export-list is a list of the instances exported on
6069
        that node.
6070

6071
    """
6072
    rpcresult = self.rpc.call_export_list(self.nodes)
6073
    result = {}
6074
    for node in rpcresult:
6075
      if rpcresult[node].failed:
6076
        result[node] = False
6077
      else:
6078
        result[node] = rpcresult[node].data
6079

    
6080
    return result
6081

    
6082

    
6083
class LUExportInstance(LogicalUnit):
6084
  """Export an instance to an image in the cluster.
6085

6086
  """
6087
  HPATH = "instance-export"
6088
  HTYPE = constants.HTYPE_INSTANCE
6089
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6090
  REQ_BGL = False
6091

    
6092
  def ExpandNames(self):
6093
    self._ExpandAndLockInstance()
6094
    # FIXME: lock only instance primary and destination node
6095
    #
6096
    # Sad but true, for now we have do lock all nodes, as we don't know where
6097
    # the previous export might be, and and in this LU we search for it and
6098
    # remove it from its current node. In the future we could fix this by:
6099
    #  - making a tasklet to search (share-lock all), then create the new one,
6100
    #    then one to remove, after
6101
    #  - removing the removal operation altoghether
6102
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6103

    
6104
  def DeclareLocks(self, level):
6105
    """Last minute lock declaration."""
6106
    # All nodes are locked anyway, so nothing to do here.
6107

    
6108
  def BuildHooksEnv(self):
6109
    """Build hooks env.
6110

6111
    This will run on the master, primary node and target node.
6112

6113
    """
6114
    env = {
6115
      "EXPORT_NODE": self.op.target_node,
6116
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6117
      }
6118
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6119
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6120
          self.op.target_node]
6121
    return env, nl, nl
6122

    
6123
  def CheckPrereq(self):
6124
    """Check prerequisites.
6125

6126
    This checks that the instance and node names are valid.
6127

6128
    """
6129
    instance_name = self.op.instance_name
6130
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6131
    assert self.instance is not None, \
6132
          "Cannot retrieve locked instance %s" % self.op.instance_name
6133
    _CheckNodeOnline(self, self.instance.primary_node)
6134

    
6135
    self.dst_node = self.cfg.GetNodeInfo(
6136
      self.cfg.ExpandNodeName(self.op.target_node))
6137

    
6138
    if self.dst_node is None:
6139
      # This is wrong node name, not a non-locked node
6140
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6141
    _CheckNodeOnline(self, self.dst_node.name)
6142
    _CheckNodeNotDrained(self, self.dst_node.name)
6143

    
6144
    # instance disk type verification
6145
    for disk in self.instance.disks:
6146
      if disk.dev_type == constants.LD_FILE:
6147
        raise errors.OpPrereqError("Export not supported for instances with"
6148
                                   " file-based disks")
6149

    
6150
  def Exec(self, feedback_fn):
6151
    """Export an instance to an image in the cluster.
6152

6153
    """
6154
    instance = self.instance
6155
    dst_node = self.dst_node
6156
    src_node = instance.primary_node
6157
    if self.op.shutdown:
6158
      # shutdown the instance, but not the disks
6159
      result = self.rpc.call_instance_shutdown(src_node, instance)
6160
      msg = result.RemoteFailMsg()
6161
      if msg:
6162
        raise errors.OpExecError("Could not shutdown instance %s on"
6163
                                 " node %s: %s" %
6164
                                 (instance.name, src_node, msg))
6165

    
6166
    vgname = self.cfg.GetVGName()
6167

    
6168
    snap_disks = []
6169

    
6170
    # set the disks ID correctly since call_instance_start needs the
6171
    # correct drbd minor to create the symlinks
6172
    for disk in instance.disks:
6173
      self.cfg.SetDiskID(disk, src_node)
6174

    
6175
    try:
6176
      for disk in instance.disks:
6177
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6178
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6179
        if new_dev_name.failed or not new_dev_name.data:
6180
          self.LogWarning("Could not snapshot block device %s on node %s",
6181
                          disk.logical_id[1], src_node)
6182
          snap_disks.append(False)
6183
        else:
6184
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6185
                                 logical_id=(vgname, new_dev_name.data),
6186
                                 physical_id=(vgname, new_dev_name.data),
6187
                                 iv_name=disk.iv_name)
6188
          snap_disks.append(new_dev)
6189

    
6190
    finally:
6191
      if self.op.shutdown and instance.admin_up:
6192
        result = self.rpc.call_instance_start(src_node, instance, None)
6193
        msg = result.RemoteFailMsg()
6194
        if msg:
6195
          _ShutdownInstanceDisks(self, instance)
6196
          raise errors.OpExecError("Could not start instance: %s" % msg)
6197

    
6198
    # TODO: check for size
6199

    
6200
    cluster_name = self.cfg.GetClusterName()
6201
    for idx, dev in enumerate(snap_disks):
6202
      if dev:
6203
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6204
                                               instance, cluster_name, idx)
6205
        if result.failed or not result.data:
6206
          self.LogWarning("Could not export block device %s from node %s to"
6207
                          " node %s", dev.logical_id[1], src_node,
6208
                          dst_node.name)
6209
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6210
        if msg:
6211
          self.LogWarning("Could not remove snapshot block device %s from node"
6212
                          " %s: %s", dev.logical_id[1], src_node, msg)
6213

    
6214
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6215
    if result.failed or not result.data:
6216
      self.LogWarning("Could not finalize export for instance %s on node %s",
6217
                      instance.name, dst_node.name)
6218

    
6219
    nodelist = self.cfg.GetNodeList()
6220
    nodelist.remove(dst_node.name)
6221

    
6222
    # on one-node clusters nodelist will be empty after the removal
6223
    # if we proceed the backup would be removed because OpQueryExports
6224
    # substitutes an empty list with the full cluster node list.
6225
    if nodelist:
6226
      exportlist = self.rpc.call_export_list(nodelist)
6227
      for node in exportlist:
6228
        if exportlist[node].failed:
6229
          continue
6230
        if instance.name in exportlist[node].data:
6231
          if not self.rpc.call_export_remove(node, instance.name):
6232
            self.LogWarning("Could not remove older export for instance %s"
6233
                            " on node %s", instance.name, node)
6234

    
6235

    
6236
class LURemoveExport(NoHooksLU):
6237
  """Remove exports related to the named instance.
6238

6239
  """
6240
  _OP_REQP = ["instance_name"]
6241
  REQ_BGL = False
6242

    
6243
  def ExpandNames(self):
6244
    self.needed_locks = {}
6245
    # We need all nodes to be locked in order for RemoveExport to work, but we
6246
    # don't need to lock the instance itself, as nothing will happen to it (and
6247
    # we can remove exports also for a removed instance)
6248
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6249

    
6250
  def CheckPrereq(self):
6251
    """Check prerequisites.
6252
    """
6253
    pass
6254

    
6255
  def Exec(self, feedback_fn):
6256
    """Remove any export.
6257

6258
    """
6259
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6260
    # If the instance was not found we'll try with the name that was passed in.
6261
    # This will only work if it was an FQDN, though.
6262
    fqdn_warn = False
6263
    if not instance_name:
6264
      fqdn_warn = True
6265
      instance_name = self.op.instance_name
6266

    
6267
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6268
      locking.LEVEL_NODE])
6269
    found = False
6270
    for node in exportlist:
6271
      if exportlist[node].failed:
6272
        self.LogWarning("Failed to query node %s, continuing" % node)
6273
        continue
6274
      if instance_name in exportlist[node].data:
6275
        found = True
6276
        result = self.rpc.call_export_remove(node, instance_name)
6277
        if result.failed or not result.data:
6278
          logging.error("Could not remove export for instance %s"
6279
                        " on node %s", instance_name, node)
6280

    
6281
    if fqdn_warn and not found:
6282
      feedback_fn("Export not found. If trying to remove an export belonging"
6283
                  " to a deleted instance please use its Fully Qualified"
6284
                  " Domain Name.")
6285

    
6286

    
6287
class TagsLU(NoHooksLU):
6288
  """Generic tags LU.
6289

6290
  This is an abstract class which is the parent of all the other tags LUs.
6291

6292
  """
6293

    
6294
  def ExpandNames(self):
6295
    self.needed_locks = {}
6296
    if self.op.kind == constants.TAG_NODE:
6297
      name = self.cfg.ExpandNodeName(self.op.name)
6298
      if name is None:
6299
        raise errors.OpPrereqError("Invalid node name (%s)" %
6300
                                   (self.op.name,))
6301
      self.op.name = name
6302
      self.needed_locks[locking.LEVEL_NODE] = name
6303
    elif self.op.kind == constants.TAG_INSTANCE:
6304
      name = self.cfg.ExpandInstanceName(self.op.name)
6305
      if name is None:
6306
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6307
                                   (self.op.name,))
6308
      self.op.name = name
6309
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6310

    
6311
  def CheckPrereq(self):
6312
    """Check prerequisites.
6313

6314
    """
6315
    if self.op.kind == constants.TAG_CLUSTER:
6316
      self.target = self.cfg.GetClusterInfo()
6317
    elif self.op.kind == constants.TAG_NODE:
6318
      self.target = self.cfg.GetNodeInfo(self.op.name)
6319
    elif self.op.kind == constants.TAG_INSTANCE:
6320
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6321
    else:
6322
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6323
                                 str(self.op.kind))
6324

    
6325

    
6326
class LUGetTags(TagsLU):
6327
  """Returns the tags of a given object.
6328

6329
  """
6330
  _OP_REQP = ["kind", "name"]
6331
  REQ_BGL = False
6332

    
6333
  def Exec(self, feedback_fn):
6334
    """Returns the tag list.
6335

6336
    """
6337
    return list(self.target.GetTags())
6338

    
6339

    
6340
class LUSearchTags(NoHooksLU):
6341
  """Searches the tags for a given pattern.
6342

6343
  """
6344
  _OP_REQP = ["pattern"]
6345
  REQ_BGL = False
6346

    
6347
  def ExpandNames(self):
6348
    self.needed_locks = {}
6349

    
6350
  def CheckPrereq(self):
6351
    """Check prerequisites.
6352

6353
    This checks the pattern passed for validity by compiling it.
6354

6355
    """
6356
    try:
6357
      self.re = re.compile(self.op.pattern)
6358
    except re.error, err:
6359
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6360
                                 (self.op.pattern, err))
6361

    
6362
  def Exec(self, feedback_fn):
6363
    """Returns the tag list.
6364

6365
    """
6366
    cfg = self.cfg
6367
    tgts = [("/cluster", cfg.GetClusterInfo())]
6368
    ilist = cfg.GetAllInstancesInfo().values()
6369
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6370
    nlist = cfg.GetAllNodesInfo().values()
6371
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6372
    results = []
6373
    for path, target in tgts:
6374
      for tag in target.GetTags():
6375
        if self.re.search(tag):
6376
          results.append((path, tag))
6377
    return results
6378

    
6379

    
6380
class LUAddTags(TagsLU):
6381
  """Sets a tag on a given object.
6382

6383
  """
6384
  _OP_REQP = ["kind", "name", "tags"]
6385
  REQ_BGL = False
6386

    
6387
  def CheckPrereq(self):
6388
    """Check prerequisites.
6389

6390
    This checks the type and length of the tag name and value.
6391

6392
    """
6393
    TagsLU.CheckPrereq(self)
6394
    for tag in self.op.tags:
6395
      objects.TaggableObject.ValidateTag(tag)
6396

    
6397
  def Exec(self, feedback_fn):
6398
    """Sets the tag.
6399

6400
    """
6401
    try:
6402
      for tag in self.op.tags:
6403
        self.target.AddTag(tag)
6404
    except errors.TagError, err:
6405
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6406
    try:
6407
      self.cfg.Update(self.target)
6408
    except errors.ConfigurationError:
6409
      raise errors.OpRetryError("There has been a modification to the"
6410
                                " config file and the operation has been"
6411
                                " aborted. Please retry.")
6412

    
6413

    
6414
class LUDelTags(TagsLU):
6415
  """Delete a list of tags from a given object.
6416

6417
  """
6418
  _OP_REQP = ["kind", "name", "tags"]
6419
  REQ_BGL = False
6420

    
6421
  def CheckPrereq(self):
6422
    """Check prerequisites.
6423

6424
    This checks that we have the given tag.
6425

6426
    """
6427
    TagsLU.CheckPrereq(self)
6428
    for tag in self.op.tags:
6429
      objects.TaggableObject.ValidateTag(tag)
6430
    del_tags = frozenset(self.op.tags)
6431
    cur_tags = self.target.GetTags()
6432
    if not del_tags <= cur_tags:
6433
      diff_tags = del_tags - cur_tags
6434
      diff_names = ["'%s'" % tag for tag in diff_tags]
6435
      diff_names.sort()
6436
      raise errors.OpPrereqError("Tag(s) %s not found" %
6437
                                 (",".join(diff_names)))
6438

    
6439
  def Exec(self, feedback_fn):
6440
    """Remove the tag from the object.
6441

6442
    """
6443
    for tag in self.op.tags:
6444
      self.target.RemoveTag(tag)
6445
    try:
6446
      self.cfg.Update(self.target)
6447
    except errors.ConfigurationError:
6448
      raise errors.OpRetryError("There has been a modification to the"
6449
                                " config file and the operation has been"
6450
                                " aborted. Please retry.")
6451

    
6452

    
6453
class LUTestDelay(NoHooksLU):
6454
  """Sleep for a specified amount of time.
6455

6456
  This LU sleeps on the master and/or nodes for a specified amount of
6457
  time.
6458

6459
  """
6460
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6461
  REQ_BGL = False
6462

    
6463
  def ExpandNames(self):
6464
    """Expand names and set required locks.
6465

6466
    This expands the node list, if any.
6467

6468
    """
6469
    self.needed_locks = {}
6470
    if self.op.on_nodes:
6471
      # _GetWantedNodes can be used here, but is not always appropriate to use
6472
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6473
      # more information.
6474
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6475
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6476

    
6477
  def CheckPrereq(self):
6478
    """Check prerequisites.
6479

6480
    """
6481

    
6482
  def Exec(self, feedback_fn):
6483
    """Do the actual sleep.
6484

6485
    """
6486
    if self.op.on_master:
6487
      if not utils.TestDelay(self.op.duration):
6488
        raise errors.OpExecError("Error during master delay test")
6489
    if self.op.on_nodes:
6490
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6491
      if not result:
6492
        raise errors.OpExecError("Complete failure from rpc call")
6493
      for node, node_result in result.items():
6494
        node_result.Raise()
6495
        if not node_result.data:
6496
          raise errors.OpExecError("Failure during rpc call to node %s,"
6497
                                   " result: %s" % (node, node_result.data))
6498

    
6499

    
6500
class IAllocator(object):
6501
  """IAllocator framework.
6502

6503
  An IAllocator instance has three sets of attributes:
6504
    - cfg that is needed to query the cluster
6505
    - input data (all members of the _KEYS class attribute are required)
6506
    - four buffer attributes (in|out_data|text), that represent the
6507
      input (to the external script) in text and data structure format,
6508
      and the output from it, again in two formats
6509
    - the result variables from the script (success, info, nodes) for
6510
      easy usage
6511

6512
  """
6513
  _ALLO_KEYS = [
6514
    "mem_size", "disks", "disk_template",
6515
    "os", "tags", "nics", "vcpus", "hypervisor",
6516
    ]
6517
  _RELO_KEYS = [
6518
    "relocate_from",
6519
    ]
6520

    
6521
  def __init__(self, lu, mode, name, **kwargs):
6522
    self.lu = lu
6523
    # init buffer variables
6524
    self.in_text = self.out_text = self.in_data = self.out_data = None
6525
    # init all input fields so that pylint is happy
6526
    self.mode = mode
6527
    self.name = name
6528
    self.mem_size = self.disks = self.disk_template = None
6529
    self.os = self.tags = self.nics = self.vcpus = None
6530
    self.hypervisor = None
6531
    self.relocate_from = None
6532
    # computed fields
6533
    self.required_nodes = None
6534
    # init result fields
6535
    self.success = self.info = self.nodes = None
6536
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6537
      keyset = self._ALLO_KEYS
6538
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6539
      keyset = self._RELO_KEYS
6540
    else:
6541
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6542
                                   " IAllocator" % self.mode)
6543
    for key in kwargs:
6544
      if key not in keyset:
6545
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6546
                                     " IAllocator" % key)
6547
      setattr(self, key, kwargs[key])
6548
    for key in keyset:
6549
      if key not in kwargs:
6550
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6551
                                     " IAllocator" % key)
6552
    self._BuildInputData()
6553

    
6554
  def _ComputeClusterData(self):
6555
    """Compute the generic allocator input data.
6556

6557
    This is the data that is independent of the actual operation.
6558

6559
    """
6560
    cfg = self.lu.cfg
6561
    cluster_info = cfg.GetClusterInfo()
6562
    # cluster data
6563
    data = {
6564
      "version": 1,
6565
      "cluster_name": cfg.GetClusterName(),
6566
      "cluster_tags": list(cluster_info.GetTags()),
6567
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6568
      # we don't have job IDs
6569
      }
6570
    iinfo = cfg.GetAllInstancesInfo().values()
6571
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6572

    
6573
    # node data
6574
    node_results = {}
6575
    node_list = cfg.GetNodeList()
6576

    
6577
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6578
      hypervisor_name = self.hypervisor
6579
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6580
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6581

    
6582
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6583
                                           hypervisor_name)
6584
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6585
                       cluster_info.enabled_hypervisors)
6586
    for nname, nresult in node_data.items():
6587
      # first fill in static (config-based) values
6588
      ninfo = cfg.GetNodeInfo(nname)
6589
      pnr = {
6590
        "tags": list(ninfo.GetTags()),
6591
        "primary_ip": ninfo.primary_ip,
6592
        "secondary_ip": ninfo.secondary_ip,
6593
        "offline": ninfo.offline,
6594
        "drained": ninfo.drained,
6595
        "master_candidate": ninfo.master_candidate,
6596
        }
6597

    
6598
      if not ninfo.offline:
6599
        nresult.Raise()
6600
        if not isinstance(nresult.data, dict):
6601
          raise errors.OpExecError("Can't get data for node %s" % nname)
6602
        remote_info = nresult.data
6603
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6604
                     'vg_size', 'vg_free', 'cpu_total']:
6605
          if attr not in remote_info:
6606
            raise errors.OpExecError("Node '%s' didn't return attribute"
6607
                                     " '%s'" % (nname, attr))
6608
          try:
6609
            remote_info[attr] = int(remote_info[attr])
6610
          except ValueError, err:
6611
            raise errors.OpExecError("Node '%s' returned invalid value"
6612
                                     " for '%s': %s" % (nname, attr, err))
6613
        # compute memory used by primary instances
6614
        i_p_mem = i_p_up_mem = 0
6615
        for iinfo, beinfo in i_list:
6616
          if iinfo.primary_node == nname:
6617
            i_p_mem += beinfo[constants.BE_MEMORY]
6618
            if iinfo.name not in node_iinfo[nname].data:
6619
              i_used_mem = 0
6620
            else:
6621
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6622
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6623
            remote_info['memory_free'] -= max(0, i_mem_diff)
6624

    
6625
            if iinfo.admin_up:
6626
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6627

    
6628
        # compute memory used by instances
6629
        pnr_dyn = {
6630
          "total_memory": remote_info['memory_total'],
6631
          "reserved_memory": remote_info['memory_dom0'],
6632
          "free_memory": remote_info['memory_free'],
6633
          "total_disk": remote_info['vg_size'],
6634
          "free_disk": remote_info['vg_free'],
6635
          "total_cpus": remote_info['cpu_total'],
6636
          "i_pri_memory": i_p_mem,
6637
          "i_pri_up_memory": i_p_up_mem,
6638
          }
6639
        pnr.update(pnr_dyn)
6640

    
6641
      node_results[nname] = pnr
6642
    data["nodes"] = node_results
6643

    
6644
    # instance data
6645
    instance_data = {}
6646
    for iinfo, beinfo in i_list:
6647
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6648
                  for n in iinfo.nics]
6649
      pir = {
6650
        "tags": list(iinfo.GetTags()),
6651
        "admin_up": iinfo.admin_up,
6652
        "vcpus": beinfo[constants.BE_VCPUS],
6653
        "memory": beinfo[constants.BE_MEMORY],
6654
        "os": iinfo.os,
6655
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6656
        "nics": nic_data,
6657
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6658
        "disk_template": iinfo.disk_template,
6659
        "hypervisor": iinfo.hypervisor,
6660
        }
6661
      instance_data[iinfo.name] = pir
6662

    
6663
    data["instances"] = instance_data
6664

    
6665
    self.in_data = data
6666

    
6667
  def _AddNewInstance(self):
6668
    """Add new instance data to allocator structure.
6669

6670
    This in combination with _AllocatorGetClusterData will create the
6671
    correct structure needed as input for the allocator.
6672

6673
    The checks for the completeness of the opcode must have already been
6674
    done.
6675

6676
    """
6677
    data = self.in_data
6678

    
6679
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6680

    
6681
    if self.disk_template in constants.DTS_NET_MIRROR:
6682
      self.required_nodes = 2
6683
    else:
6684
      self.required_nodes = 1
6685
    request = {
6686
      "type": "allocate",
6687
      "name": self.name,
6688
      "disk_template": self.disk_template,
6689
      "tags": self.tags,
6690
      "os": self.os,
6691
      "vcpus": self.vcpus,
6692
      "memory": self.mem_size,
6693
      "disks": self.disks,
6694
      "disk_space_total": disk_space,
6695
      "nics": self.nics,
6696
      "required_nodes": self.required_nodes,
6697
      }
6698
    data["request"] = request
6699

    
6700
  def _AddRelocateInstance(self):
6701
    """Add relocate instance data to allocator structure.
6702

6703
    This in combination with _IAllocatorGetClusterData will create the
6704
    correct structure needed as input for the allocator.
6705

6706
    The checks for the completeness of the opcode must have already been
6707
    done.
6708

6709
    """
6710
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6711
    if instance is None:
6712
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6713
                                   " IAllocator" % self.name)
6714

    
6715
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6716
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6717

    
6718
    if len(instance.secondary_nodes) != 1:
6719
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6720

    
6721
    self.required_nodes = 1
6722
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6723
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6724

    
6725
    request = {
6726
      "type": "relocate",
6727
      "name": self.name,
6728
      "disk_space_total": disk_space,
6729
      "required_nodes": self.required_nodes,
6730
      "relocate_from": self.relocate_from,
6731
      }
6732
    self.in_data["request"] = request
6733

    
6734
  def _BuildInputData(self):
6735
    """Build input data structures.
6736

6737
    """
6738
    self._ComputeClusterData()
6739

    
6740
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6741
      self._AddNewInstance()
6742
    else:
6743
      self._AddRelocateInstance()
6744

    
6745
    self.in_text = serializer.Dump(self.in_data)
6746

    
6747
  def Run(self, name, validate=True, call_fn=None):
6748
    """Run an instance allocator and return the results.
6749

6750
    """
6751
    if call_fn is None:
6752
      call_fn = self.lu.rpc.call_iallocator_runner
6753
    data = self.in_text
6754

    
6755
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6756
    result.Raise()
6757

    
6758
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6759
      raise errors.OpExecError("Invalid result from master iallocator runner")
6760

    
6761
    rcode, stdout, stderr, fail = result.data
6762

    
6763
    if rcode == constants.IARUN_NOTFOUND:
6764
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6765
    elif rcode == constants.IARUN_FAILURE:
6766
      raise errors.OpExecError("Instance allocator call failed: %s,"
6767
                               " output: %s" % (fail, stdout+stderr))
6768
    self.out_text = stdout
6769
    if validate:
6770
      self._ValidateResult()
6771

    
6772
  def _ValidateResult(self):
6773
    """Process the allocator results.
6774

6775
    This will process and if successful save the result in
6776
    self.out_data and the other parameters.
6777

6778
    """
6779
    try:
6780
      rdict = serializer.Load(self.out_text)
6781
    except Exception, err:
6782
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6783

    
6784
    if not isinstance(rdict, dict):
6785
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6786

    
6787
    for key in "success", "info", "nodes":
6788
      if key not in rdict:
6789
        raise errors.OpExecError("Can't parse iallocator results:"
6790
                                 " missing key '%s'" % key)
6791
      setattr(self, key, rdict[key])
6792

    
6793
    if not isinstance(rdict["nodes"], list):
6794
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6795
                               " is not a list")
6796
    self.out_data = rdict
6797

    
6798

    
6799
class LUTestAllocator(NoHooksLU):
6800
  """Run allocator tests.
6801

6802
  This LU runs the allocator tests
6803

6804
  """
6805
  _OP_REQP = ["direction", "mode", "name"]
6806

    
6807
  def CheckPrereq(self):
6808
    """Check prerequisites.
6809

6810
    This checks the opcode parameters depending on the director and mode test.
6811

6812
    """
6813
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6814
      for attr in ["name", "mem_size", "disks", "disk_template",
6815
                   "os", "tags", "nics", "vcpus"]:
6816
        if not hasattr(self.op, attr):
6817
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6818
                                     attr)
6819
      iname = self.cfg.ExpandInstanceName(self.op.name)
6820
      if iname is not None:
6821
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6822
                                   iname)
6823
      if not isinstance(self.op.nics, list):
6824
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6825
      for row in self.op.nics:
6826
        if (not isinstance(row, dict) or
6827
            "mac" not in row or
6828
            "ip" not in row or
6829
            "bridge" not in row):
6830
          raise errors.OpPrereqError("Invalid contents of the"
6831
                                     " 'nics' parameter")
6832
      if not isinstance(self.op.disks, list):
6833
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6834
      for row in self.op.disks:
6835
        if (not isinstance(row, dict) or
6836
            "size" not in row or
6837
            not isinstance(row["size"], int) or
6838
            "mode" not in row or
6839
            row["mode"] not in ['r', 'w']):
6840
          raise errors.OpPrereqError("Invalid contents of the"
6841
                                     " 'disks' parameter")
6842
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6843
        self.op.hypervisor = self.cfg.GetHypervisorType()
6844
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6845
      if not hasattr(self.op, "name"):
6846
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6847
      fname = self.cfg.ExpandInstanceName(self.op.name)
6848
      if fname is None:
6849
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6850
                                   self.op.name)
6851
      self.op.name = fname
6852
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6853
    else:
6854
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6855
                                 self.op.mode)
6856

    
6857
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6858
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6859
        raise errors.OpPrereqError("Missing allocator name")
6860
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6861
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6862
                                 self.op.direction)
6863

    
6864
  def Exec(self, feedback_fn):
6865
    """Run the allocator test.
6866

6867
    """
6868
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6869
      ial = IAllocator(self,
6870
                       mode=self.op.mode,
6871
                       name=self.op.name,
6872
                       mem_size=self.op.mem_size,
6873
                       disks=self.op.disks,
6874
                       disk_template=self.op.disk_template,
6875
                       os=self.op.os,
6876
                       tags=self.op.tags,
6877
                       nics=self.op.nics,
6878
                       vcpus=self.op.vcpus,
6879
                       hypervisor=self.op.hypervisor,
6880
                       )
6881
    else:
6882
      ial = IAllocator(self,
6883
                       mode=self.op.mode,
6884
                       name=self.op.name,
6885
                       relocate_from=list(self.relocate_from),
6886
                       )
6887

    
6888
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6889
      result = ial.in_text
6890
    else:
6891
      ial.Run(self.op.allocator, validate=False)
6892
      result = ial.out_text
6893
    return result