Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 949bdabe

History | View | Annotate | Download (240.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396
  return wanted
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the node is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _CheckNodeNotDrained(lu, node):
445
  """Ensure that a given node is not drained.
446

447
  @param lu: the LU on behalf of which we make the check
448
  @param node: the node to check
449
  @raise errors.OpPrereqError: if the node is drained
450

451
  """
452
  if lu.cfg.GetNodeInfo(node).drained:
453
    raise errors.OpPrereqError("Can't use drained node %s" % node)
454

    
455

    
456
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457
                          memory, vcpus, nics, disk_template, disks):
458
  """Builds instance related env variables for hooks
459

460
  This builds the hook environment from individual variables.
461

462
  @type name: string
463
  @param name: the name of the instance
464
  @type primary_node: string
465
  @param primary_node: the name of the instance's primary node
466
  @type secondary_nodes: list
467
  @param secondary_nodes: list of secondary nodes as strings
468
  @type os_type: string
469
  @param os_type: the name of the instance's OS
470
  @type status: boolean
471
  @param status: the should_run status of the instance
472
  @type memory: string
473
  @param memory: the memory size of the instance
474
  @type vcpus: string
475
  @param vcpus: the count of VCPUs the instance has
476
  @type nics: list
477
  @param nics: list of tuples (ip, bridge, mac) representing
478
      the NICs the instance  has
479
  @type disk_template: string
480
  @param disk_template: the distk template of the instance
481
  @type disks: list
482
  @param disks: the list of (size, mode) pairs
483
  @rtype: dict
484
  @return: the hook environment for this instance
485

486
  """
487
  if status:
488
    str_status = "up"
489
  else:
490
    str_status = "down"
491
  env = {
492
    "OP_TARGET": name,
493
    "INSTANCE_NAME": name,
494
    "INSTANCE_PRIMARY": primary_node,
495
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496
    "INSTANCE_OS_TYPE": os_type,
497
    "INSTANCE_STATUS": str_status,
498
    "INSTANCE_MEMORY": memory,
499
    "INSTANCE_VCPUS": vcpus,
500
    "INSTANCE_DISK_TEMPLATE": disk_template,
501
  }
502

    
503
  if nics:
504
    nic_count = len(nics)
505
    for idx, (ip, bridge, mac) in enumerate(nics):
506
      if ip is None:
507
        ip = ""
508
      env["INSTANCE_NIC%d_IP" % idx] = ip
509
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510
      env["INSTANCE_NIC%d_MAC" % idx] = mac
511
  else:
512
    nic_count = 0
513

    
514
  env["INSTANCE_NIC_COUNT"] = nic_count
515

    
516
  if disks:
517
    disk_count = len(disks)
518
    for idx, (size, mode) in enumerate(disks):
519
      env["INSTANCE_DISK%d_SIZE" % idx] = size
520
      env["INSTANCE_DISK%d_MODE" % idx] = mode
521
  else:
522
    disk_count = 0
523

    
524
  env["INSTANCE_DISK_COUNT"] = disk_count
525

    
526
  return env
527

    
528

    
529
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530
  """Builds instance related env variables for hooks from an object.
531

532
  @type lu: L{LogicalUnit}
533
  @param lu: the logical unit on whose behalf we execute
534
  @type instance: L{objects.Instance}
535
  @param instance: the instance for which we should build the
536
      environment
537
  @type override: dict
538
  @param override: dictionary with key/values that will override
539
      our values
540
  @rtype: dict
541
  @return: the hook environment dictionary
542

543
  """
544
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
545
  args = {
546
    'name': instance.name,
547
    'primary_node': instance.primary_node,
548
    'secondary_nodes': instance.secondary_nodes,
549
    'os_type': instance.os,
550
    'status': instance.admin_up,
551
    'memory': bep[constants.BE_MEMORY],
552
    'vcpus': bep[constants.BE_VCPUS],
553
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554
    'disk_template': instance.disk_template,
555
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
556
  }
557
  if override:
558
    args.update(override)
559
  return _BuildInstanceHookEnv(**args)
560

    
561

    
562
def _AdjustCandidatePool(lu):
563
  """Adjust the candidate pool after node operations.
564

565
  """
566
  mod_list = lu.cfg.MaintainCandidatePool()
567
  if mod_list:
568
    lu.LogInfo("Promoted nodes to master candidate role: %s",
569
               ", ".join(node.name for node in mod_list))
570
    for name in mod_list:
571
      lu.context.ReaddNode(name)
572
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573
  if mc_now > mc_max:
574
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575
               (mc_now, mc_max))
576

    
577

    
578
def _CheckInstanceBridgesExist(lu, instance):
579
  """Check that the brigdes needed by an instance exist.
580

581
  """
582
  # check bridges existance
583
  brlist = [nic.bridge for nic in instance.nics]
584
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585
  result.Raise()
586
  if not result.data:
587
    raise errors.OpPrereqError("One or more target bridges %s does not"
588
                               " exist on destination node '%s'" %
589
                               (brlist, instance.primary_node))
590

    
591

    
592
class LUDestroyCluster(NoHooksLU):
593
  """Logical unit for destroying the cluster.
594

595
  """
596
  _OP_REQP = []
597

    
598
  def CheckPrereq(self):
599
    """Check prerequisites.
600

601
    This checks whether the cluster is empty.
602

603
    Any errors are signalled by raising errors.OpPrereqError.
604

605
    """
606
    master = self.cfg.GetMasterNode()
607

    
608
    nodelist = self.cfg.GetNodeList()
609
    if len(nodelist) != 1 or nodelist[0] != master:
610
      raise errors.OpPrereqError("There are still %d node(s) in"
611
                                 " this cluster." % (len(nodelist) - 1))
612
    instancelist = self.cfg.GetInstanceList()
613
    if instancelist:
614
      raise errors.OpPrereqError("There are still %d instance(s) in"
615
                                 " this cluster." % len(instancelist))
616

    
617
  def Exec(self, feedback_fn):
618
    """Destroys the cluster.
619

620
    """
621
    master = self.cfg.GetMasterNode()
622
    result = self.rpc.call_node_stop_master(master, False)
623
    result.Raise()
624
    if not result.data:
625
      raise errors.OpExecError("Could not disable the master role")
626
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627
    utils.CreateBackup(priv_key)
628
    utils.CreateBackup(pub_key)
629
    return master
630

    
631

    
632
class LUVerifyCluster(LogicalUnit):
633
  """Verifies the cluster status.
634

635
  """
636
  HPATH = "cluster-verify"
637
  HTYPE = constants.HTYPE_CLUSTER
638
  _OP_REQP = ["skip_checks"]
639
  REQ_BGL = False
640

    
641
  def ExpandNames(self):
642
    self.needed_locks = {
643
      locking.LEVEL_NODE: locking.ALL_SET,
644
      locking.LEVEL_INSTANCE: locking.ALL_SET,
645
    }
646
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647

    
648
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649
                  node_result, feedback_fn, master_files,
650
                  drbd_map, vg_name):
651
    """Run multiple tests against a node.
652

653
    Test list:
654

655
      - compares ganeti version
656
      - checks vg existance and size > 20G
657
      - checks config file checksum
658
      - checks ssh to other nodes
659

660
    @type nodeinfo: L{objects.Node}
661
    @param nodeinfo: the node to check
662
    @param file_list: required list of files
663
    @param local_cksum: dictionary of local files and their checksums
664
    @param node_result: the results from the node
665
    @param feedback_fn: function used to accumulate results
666
    @param master_files: list of files that only masters should have
667
    @param drbd_map: the useddrbd minors for this node, in
668
        form of minor: (instance, must_exist) which correspond to instances
669
        and their running status
670
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
671

672
    """
673
    node = nodeinfo.name
674

    
675
    # main result, node_result should be a non-empty dict
676
    if not node_result or not isinstance(node_result, dict):
677
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
678
      return True
679

    
680
    # compares ganeti version
681
    local_version = constants.PROTOCOL_VERSION
682
    remote_version = node_result.get('version', None)
683
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
684
            len(remote_version) == 2):
685
      feedback_fn("  - ERROR: connection to %s failed" % (node))
686
      return True
687

    
688
    if local_version != remote_version[0]:
689
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
690
                  " node %s %s" % (local_version, node, remote_version[0]))
691
      return True
692

    
693
    # node seems compatible, we can actually try to look into its results
694

    
695
    bad = False
696

    
697
    # full package version
698
    if constants.RELEASE_VERSION != remote_version[1]:
699
      feedback_fn("  - WARNING: software version mismatch: master %s,"
700
                  " node %s %s" %
701
                  (constants.RELEASE_VERSION, node, remote_version[1]))
702

    
703
    # checks vg existence and size > 20G
704
    if vg_name is not None:
705
      vglist = node_result.get(constants.NV_VGLIST, None)
706
      if not vglist:
707
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
708
                        (node,))
709
        bad = True
710
      else:
711
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
712
                                              constants.MIN_VG_SIZE)
713
        if vgstatus:
714
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
715
          bad = True
716

    
717
    # checks config file checksum
718

    
719
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
720
    if not isinstance(remote_cksum, dict):
721
      bad = True
722
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
723
    else:
724
      for file_name in file_list:
725
        node_is_mc = nodeinfo.master_candidate
726
        must_have_file = file_name not in master_files
727
        if file_name not in remote_cksum:
728
          if node_is_mc or must_have_file:
729
            bad = True
730
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
731
        elif remote_cksum[file_name] != local_cksum[file_name]:
732
          if node_is_mc or must_have_file:
733
            bad = True
734
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
735
          else:
736
            # not candidate and this is not a must-have file
737
            bad = True
738
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
739
                        " '%s'" % file_name)
740
        else:
741
          # all good, except non-master/non-must have combination
742
          if not node_is_mc and not must_have_file:
743
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
744
                        " candidates" % file_name)
745

    
746
    # checks ssh to any
747

    
748
    if constants.NV_NODELIST not in node_result:
749
      bad = True
750
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
751
    else:
752
      if node_result[constants.NV_NODELIST]:
753
        bad = True
754
        for node in node_result[constants.NV_NODELIST]:
755
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
756
                          (node, node_result[constants.NV_NODELIST][node]))
757

    
758
    if constants.NV_NODENETTEST not in node_result:
759
      bad = True
760
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
761
    else:
762
      if node_result[constants.NV_NODENETTEST]:
763
        bad = True
764
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
765
        for node in nlist:
766
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
767
                          (node, node_result[constants.NV_NODENETTEST][node]))
768

    
769
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
770
    if isinstance(hyp_result, dict):
771
      for hv_name, hv_result in hyp_result.iteritems():
772
        if hv_result is not None:
773
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
774
                      (hv_name, hv_result))
775

    
776
    # check used drbd list
777
    if vg_name is not None:
778
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
779
      if not isinstance(used_minors, (tuple, list)):
780
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
781
                    str(used_minors))
782
      else:
783
        for minor, (iname, must_exist) in drbd_map.items():
784
          if minor not in used_minors and must_exist:
785
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
786
                        " not active" % (minor, iname))
787
            bad = True
788
        for minor in used_minors:
789
          if minor not in drbd_map:
790
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
791
                        minor)
792
            bad = True
793

    
794
    return bad
795

    
796
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
797
                      node_instance, feedback_fn, n_offline):
798
    """Verify an instance.
799

800
    This function checks to see if the required block devices are
801
    available on the instance's node.
802

803
    """
804
    bad = False
805

    
806
    node_current = instanceconfig.primary_node
807

    
808
    node_vol_should = {}
809
    instanceconfig.MapLVsByNode(node_vol_should)
810

    
811
    for node in node_vol_should:
812
      if node in n_offline:
813
        # ignore missing volumes on offline nodes
814
        continue
815
      for volume in node_vol_should[node]:
816
        if node not in node_vol_is or volume not in node_vol_is[node]:
817
          feedback_fn("  - ERROR: volume %s missing on node %s" %
818
                          (volume, node))
819
          bad = True
820

    
821
    if instanceconfig.admin_up:
822
      if ((node_current not in node_instance or
823
          not instance in node_instance[node_current]) and
824
          node_current not in n_offline):
825
        feedback_fn("  - ERROR: instance %s not running on node %s" %
826
                        (instance, node_current))
827
        bad = True
828

    
829
    for node in node_instance:
830
      if (not node == node_current):
831
        if instance in node_instance[node]:
832
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
833
                          (instance, node))
834
          bad = True
835

    
836
    return bad
837

    
838
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
839
    """Verify if there are any unknown volumes in the cluster.
840

841
    The .os, .swap and backup volumes are ignored. All other volumes are
842
    reported as unknown.
843

844
    """
845
    bad = False
846

    
847
    for node in node_vol_is:
848
      for volume in node_vol_is[node]:
849
        if node not in node_vol_should or volume not in node_vol_should[node]:
850
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
851
                      (volume, node))
852
          bad = True
853
    return bad
854

    
855
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
856
    """Verify the list of running instances.
857

858
    This checks what instances are running but unknown to the cluster.
859

860
    """
861
    bad = False
862
    for node in node_instance:
863
      for runninginstance in node_instance[node]:
864
        if runninginstance not in instancelist:
865
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
866
                          (runninginstance, node))
867
          bad = True
868
    return bad
869

    
870
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
871
    """Verify N+1 Memory Resilience.
872

873
    Check that if one single node dies we can still start all the instances it
874
    was primary for.
875

876
    """
877
    bad = False
878

    
879
    for node, nodeinfo in node_info.iteritems():
880
      # This code checks that every node which is now listed as secondary has
881
      # enough memory to host all instances it is supposed to should a single
882
      # other node in the cluster fail.
883
      # FIXME: not ready for failover to an arbitrary node
884
      # FIXME: does not support file-backed instances
885
      # WARNING: we currently take into account down instances as well as up
886
      # ones, considering that even if they're down someone might want to start
887
      # them even in the event of a node failure.
888
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
889
        needed_mem = 0
890
        for instance in instances:
891
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
892
          if bep[constants.BE_AUTO_BALANCE]:
893
            needed_mem += bep[constants.BE_MEMORY]
894
        if nodeinfo['mfree'] < needed_mem:
895
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
896
                      " failovers should node %s fail" % (node, prinode))
897
          bad = True
898
    return bad
899

    
900
  def CheckPrereq(self):
901
    """Check prerequisites.
902

903
    Transform the list of checks we're going to skip into a set and check that
904
    all its members are valid.
905

906
    """
907
    self.skip_set = frozenset(self.op.skip_checks)
908
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
909
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
910

    
911
  def BuildHooksEnv(self):
912
    """Build hooks env.
913

914
    Cluster-Verify hooks just rone in the post phase and their failure makes
915
    the output be logged in the verify output and the verification to fail.
916

917
    """
918
    all_nodes = self.cfg.GetNodeList()
919
    env = {
920
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
921
      }
922
    for node in self.cfg.GetAllNodesInfo().values():
923
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
924

    
925
    return env, [], all_nodes
926

    
927
  def Exec(self, feedback_fn):
928
    """Verify integrity of cluster, performing various test on nodes.
929

930
    """
931
    bad = False
932
    feedback_fn("* Verifying global settings")
933
    for msg in self.cfg.VerifyConfig():
934
      feedback_fn("  - ERROR: %s" % msg)
935

    
936
    vg_name = self.cfg.GetVGName()
937
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
938
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
939
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
940
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
941
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
942
                        for iname in instancelist)
943
    i_non_redundant = [] # Non redundant instances
944
    i_non_a_balanced = [] # Non auto-balanced instances
945
    n_offline = [] # List of offline nodes
946
    n_drained = [] # List of nodes being drained
947
    node_volume = {}
948
    node_instance = {}
949
    node_info = {}
950
    instance_cfg = {}
951

    
952
    # FIXME: verify OS list
953
    # do local checksums
954
    master_files = [constants.CLUSTER_CONF_FILE]
955

    
956
    file_names = ssconf.SimpleStore().GetFileList()
957
    file_names.append(constants.SSL_CERT_FILE)
958
    file_names.append(constants.RAPI_CERT_FILE)
959
    file_names.extend(master_files)
960

    
961
    local_checksums = utils.FingerprintFiles(file_names)
962

    
963
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
964
    node_verify_param = {
965
      constants.NV_FILELIST: file_names,
966
      constants.NV_NODELIST: [node.name for node in nodeinfo
967
                              if not node.offline],
968
      constants.NV_HYPERVISOR: hypervisors,
969
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
970
                                  node.secondary_ip) for node in nodeinfo
971
                                 if not node.offline],
972
      constants.NV_INSTANCELIST: hypervisors,
973
      constants.NV_VERSION: None,
974
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
975
      }
976
    if vg_name is not None:
977
      node_verify_param[constants.NV_VGLIST] = None
978
      node_verify_param[constants.NV_LVLIST] = vg_name
979
      node_verify_param[constants.NV_DRBDLIST] = None
980
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
981
                                           self.cfg.GetClusterName())
982

    
983
    cluster = self.cfg.GetClusterInfo()
984
    master_node = self.cfg.GetMasterNode()
985
    all_drbd_map = self.cfg.ComputeDRBDMap()
986

    
987
    for node_i in nodeinfo:
988
      node = node_i.name
989
      nresult = all_nvinfo[node].data
990

    
991
      if node_i.offline:
992
        feedback_fn("* Skipping offline node %s" % (node,))
993
        n_offline.append(node)
994
        continue
995

    
996
      if node == master_node:
997
        ntype = "master"
998
      elif node_i.master_candidate:
999
        ntype = "master candidate"
1000
      elif node_i.drained:
1001
        ntype = "drained"
1002
        n_drained.append(node)
1003
      else:
1004
        ntype = "regular"
1005
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1006

    
1007
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1008
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1009
        bad = True
1010
        continue
1011

    
1012
      node_drbd = {}
1013
      for minor, instance in all_drbd_map[node].items():
1014
        if instance not in instanceinfo:
1015
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1016
                      instance)
1017
          # ghost instance should not be running, but otherwise we
1018
          # don't give double warnings (both ghost instance and
1019
          # unallocated minor in use)
1020
          node_drbd[minor] = (instance, False)
1021
        else:
1022
          instance = instanceinfo[instance]
1023
          node_drbd[minor] = (instance.name, instance.admin_up)
1024
      result = self._VerifyNode(node_i, file_names, local_checksums,
1025
                                nresult, feedback_fn, master_files,
1026
                                node_drbd, vg_name)
1027
      bad = bad or result
1028

    
1029
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1030
      if vg_name is None:
1031
        node_volume[node] = {}
1032
      elif isinstance(lvdata, basestring):
1033
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1034
                    (node, utils.SafeEncode(lvdata)))
1035
        bad = True
1036
        node_volume[node] = {}
1037
      elif not isinstance(lvdata, dict):
1038
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1039
        bad = True
1040
        continue
1041
      else:
1042
        node_volume[node] = lvdata
1043

    
1044
      # node_instance
1045
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1046
      if not isinstance(idata, list):
1047
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1048
                    (node,))
1049
        bad = True
1050
        continue
1051

    
1052
      node_instance[node] = idata
1053

    
1054
      # node_info
1055
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1056
      if not isinstance(nodeinfo, dict):
1057
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1058
        bad = True
1059
        continue
1060

    
1061
      try:
1062
        node_info[node] = {
1063
          "mfree": int(nodeinfo['memory_free']),
1064
          "pinst": [],
1065
          "sinst": [],
1066
          # dictionary holding all instances this node is secondary for,
1067
          # grouped by their primary node. Each key is a cluster node, and each
1068
          # value is a list of instances which have the key as primary and the
1069
          # current node as secondary.  this is handy to calculate N+1 memory
1070
          # availability if you can only failover from a primary to its
1071
          # secondary.
1072
          "sinst-by-pnode": {},
1073
        }
1074
        # FIXME: devise a free space model for file based instances as well
1075
        if vg_name is not None:
1076
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1077
      except ValueError:
1078
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1079
        bad = True
1080
        continue
1081

    
1082
    node_vol_should = {}
1083

    
1084
    for instance in instancelist:
1085
      feedback_fn("* Verifying instance %s" % instance)
1086
      inst_config = instanceinfo[instance]
1087
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1088
                                     node_instance, feedback_fn, n_offline)
1089
      bad = bad or result
1090
      inst_nodes_offline = []
1091

    
1092
      inst_config.MapLVsByNode(node_vol_should)
1093

    
1094
      instance_cfg[instance] = inst_config
1095

    
1096
      pnode = inst_config.primary_node
1097
      if pnode in node_info:
1098
        node_info[pnode]['pinst'].append(instance)
1099
      elif pnode not in n_offline:
1100
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1101
                    " %s failed" % (instance, pnode))
1102
        bad = True
1103

    
1104
      if pnode in n_offline:
1105
        inst_nodes_offline.append(pnode)
1106

    
1107
      # If the instance is non-redundant we cannot survive losing its primary
1108
      # node, so we are not N+1 compliant. On the other hand we have no disk
1109
      # templates with more than one secondary so that situation is not well
1110
      # supported either.
1111
      # FIXME: does not support file-backed instances
1112
      if len(inst_config.secondary_nodes) == 0:
1113
        i_non_redundant.append(instance)
1114
      elif len(inst_config.secondary_nodes) > 1:
1115
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1116
                    % instance)
1117

    
1118
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1119
        i_non_a_balanced.append(instance)
1120

    
1121
      for snode in inst_config.secondary_nodes:
1122
        if snode in node_info:
1123
          node_info[snode]['sinst'].append(instance)
1124
          if pnode not in node_info[snode]['sinst-by-pnode']:
1125
            node_info[snode]['sinst-by-pnode'][pnode] = []
1126
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1127
        elif snode not in n_offline:
1128
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1129
                      " %s failed" % (instance, snode))
1130
          bad = True
1131
        if snode in n_offline:
1132
          inst_nodes_offline.append(snode)
1133

    
1134
      if inst_nodes_offline:
1135
        # warn that the instance lives on offline nodes, and set bad=True
1136
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1137
                    ", ".join(inst_nodes_offline))
1138
        bad = True
1139

    
1140
    feedback_fn("* Verifying orphan volumes")
1141
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1142
                                       feedback_fn)
1143
    bad = bad or result
1144

    
1145
    feedback_fn("* Verifying remaining instances")
1146
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1147
                                         feedback_fn)
1148
    bad = bad or result
1149

    
1150
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1151
      feedback_fn("* Verifying N+1 Memory redundancy")
1152
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1153
      bad = bad or result
1154

    
1155
    feedback_fn("* Other Notes")
1156
    if i_non_redundant:
1157
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1158
                  % len(i_non_redundant))
1159

    
1160
    if i_non_a_balanced:
1161
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1162
                  % len(i_non_a_balanced))
1163

    
1164
    if n_offline:
1165
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1166

    
1167
    if n_drained:
1168
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1169

    
1170
    return not bad
1171

    
1172
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1173
    """Analize the post-hooks' result
1174

1175
    This method analyses the hook result, handles it, and sends some
1176
    nicely-formatted feedback back to the user.
1177

1178
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1179
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1180
    @param hooks_results: the results of the multi-node hooks rpc call
1181
    @param feedback_fn: function used send feedback back to the caller
1182
    @param lu_result: previous Exec result
1183
    @return: the new Exec result, based on the previous result
1184
        and hook results
1185

1186
    """
1187
    # We only really run POST phase hooks, and are only interested in
1188
    # their results
1189
    if phase == constants.HOOKS_PHASE_POST:
1190
      # Used to change hooks' output to proper indentation
1191
      indent_re = re.compile('^', re.M)
1192
      feedback_fn("* Hooks Results")
1193
      if not hooks_results:
1194
        feedback_fn("  - ERROR: general communication failure")
1195
        lu_result = 1
1196
      else:
1197
        for node_name in hooks_results:
1198
          show_node_header = True
1199
          res = hooks_results[node_name]
1200
          if res.failed or res.data is False or not isinstance(res.data, list):
1201
            if res.offline:
1202
              # no need to warn or set fail return value
1203
              continue
1204
            feedback_fn("    Communication failure in hooks execution")
1205
            lu_result = 1
1206
            continue
1207
          for script, hkr, output in res.data:
1208
            if hkr == constants.HKR_FAIL:
1209
              # The node header is only shown once, if there are
1210
              # failing hooks on that node
1211
              if show_node_header:
1212
                feedback_fn("  Node %s:" % node_name)
1213
                show_node_header = False
1214
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1215
              output = indent_re.sub('      ', output)
1216
              feedback_fn("%s" % output)
1217
              lu_result = 1
1218

    
1219
      return lu_result
1220

    
1221

    
1222
class LUVerifyDisks(NoHooksLU):
1223
  """Verifies the cluster disks status.
1224

1225
  """
1226
  _OP_REQP = []
1227
  REQ_BGL = False
1228

    
1229
  def ExpandNames(self):
1230
    self.needed_locks = {
1231
      locking.LEVEL_NODE: locking.ALL_SET,
1232
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1233
    }
1234
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1235

    
1236
  def CheckPrereq(self):
1237
    """Check prerequisites.
1238

1239
    This has no prerequisites.
1240

1241
    """
1242
    pass
1243

    
1244
  def Exec(self, feedback_fn):
1245
    """Verify integrity of cluster disks.
1246

1247
    """
1248
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1249

    
1250
    vg_name = self.cfg.GetVGName()
1251
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1252
    instances = [self.cfg.GetInstanceInfo(name)
1253
                 for name in self.cfg.GetInstanceList()]
1254

    
1255
    nv_dict = {}
1256
    for inst in instances:
1257
      inst_lvs = {}
1258
      if (not inst.admin_up or
1259
          inst.disk_template not in constants.DTS_NET_MIRROR):
1260
        continue
1261
      inst.MapLVsByNode(inst_lvs)
1262
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1263
      for node, vol_list in inst_lvs.iteritems():
1264
        for vol in vol_list:
1265
          nv_dict[(node, vol)] = inst
1266

    
1267
    if not nv_dict:
1268
      return result
1269

    
1270
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1271

    
1272
    to_act = set()
1273
    for node in nodes:
1274
      # node_volume
1275
      lvs = node_lvs[node]
1276
      if lvs.failed:
1277
        if not lvs.offline:
1278
          self.LogWarning("Connection to node %s failed: %s" %
1279
                          (node, lvs.data))
1280
        continue
1281
      lvs = lvs.data
1282
      if isinstance(lvs, basestring):
1283
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1284
        res_nlvm[node] = lvs
1285
      elif not isinstance(lvs, dict):
1286
        logging.warning("Connection to node %s failed or invalid data"
1287
                        " returned", node)
1288
        res_nodes.append(node)
1289
        continue
1290

    
1291
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1292
        inst = nv_dict.pop((node, lv_name), None)
1293
        if (not lv_online and inst is not None
1294
            and inst.name not in res_instances):
1295
          res_instances.append(inst.name)
1296

    
1297
    # any leftover items in nv_dict are missing LVs, let's arrange the
1298
    # data better
1299
    for key, inst in nv_dict.iteritems():
1300
      if inst.name not in res_missing:
1301
        res_missing[inst.name] = []
1302
      res_missing[inst.name].append(key)
1303

    
1304
    return result
1305

    
1306

    
1307
class LURenameCluster(LogicalUnit):
1308
  """Rename the cluster.
1309

1310
  """
1311
  HPATH = "cluster-rename"
1312
  HTYPE = constants.HTYPE_CLUSTER
1313
  _OP_REQP = ["name"]
1314

    
1315
  def BuildHooksEnv(self):
1316
    """Build hooks env.
1317

1318
    """
1319
    env = {
1320
      "OP_TARGET": self.cfg.GetClusterName(),
1321
      "NEW_NAME": self.op.name,
1322
      }
1323
    mn = self.cfg.GetMasterNode()
1324
    return env, [mn], [mn]
1325

    
1326
  def CheckPrereq(self):
1327
    """Verify that the passed name is a valid one.
1328

1329
    """
1330
    hostname = utils.HostInfo(self.op.name)
1331

    
1332
    new_name = hostname.name
1333
    self.ip = new_ip = hostname.ip
1334
    old_name = self.cfg.GetClusterName()
1335
    old_ip = self.cfg.GetMasterIP()
1336
    if new_name == old_name and new_ip == old_ip:
1337
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1338
                                 " cluster has changed")
1339
    if new_ip != old_ip:
1340
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1341
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1342
                                   " reachable on the network. Aborting." %
1343
                                   new_ip)
1344

    
1345
    self.op.name = new_name
1346

    
1347
  def Exec(self, feedback_fn):
1348
    """Rename the cluster.
1349

1350
    """
1351
    clustername = self.op.name
1352
    ip = self.ip
1353

    
1354
    # shutdown the master IP
1355
    master = self.cfg.GetMasterNode()
1356
    result = self.rpc.call_node_stop_master(master, False)
1357
    if result.failed or not result.data:
1358
      raise errors.OpExecError("Could not disable the master role")
1359

    
1360
    try:
1361
      cluster = self.cfg.GetClusterInfo()
1362
      cluster.cluster_name = clustername
1363
      cluster.master_ip = ip
1364
      self.cfg.Update(cluster)
1365

    
1366
      # update the known hosts file
1367
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1368
      node_list = self.cfg.GetNodeList()
1369
      try:
1370
        node_list.remove(master)
1371
      except ValueError:
1372
        pass
1373
      result = self.rpc.call_upload_file(node_list,
1374
                                         constants.SSH_KNOWN_HOSTS_FILE)
1375
      for to_node, to_result in result.iteritems():
1376
        if to_result.failed or not to_result.data:
1377
          logging.error("Copy of file %s to node %s failed",
1378
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1379

    
1380
    finally:
1381
      result = self.rpc.call_node_start_master(master, False)
1382
      if result.failed or not result.data:
1383
        self.LogWarning("Could not re-enable the master role on"
1384
                        " the master, please restart manually.")
1385

    
1386

    
1387
def _RecursiveCheckIfLVMBased(disk):
1388
  """Check if the given disk or its children are lvm-based.
1389

1390
  @type disk: L{objects.Disk}
1391
  @param disk: the disk to check
1392
  @rtype: booleean
1393
  @return: boolean indicating whether a LD_LV dev_type was found or not
1394

1395
  """
1396
  if disk.children:
1397
    for chdisk in disk.children:
1398
      if _RecursiveCheckIfLVMBased(chdisk):
1399
        return True
1400
  return disk.dev_type == constants.LD_LV
1401

    
1402

    
1403
class LUSetClusterParams(LogicalUnit):
1404
  """Change the parameters of the cluster.
1405

1406
  """
1407
  HPATH = "cluster-modify"
1408
  HTYPE = constants.HTYPE_CLUSTER
1409
  _OP_REQP = []
1410
  REQ_BGL = False
1411

    
1412
  def CheckParameters(self):
1413
    """Check parameters
1414

1415
    """
1416
    if not hasattr(self.op, "candidate_pool_size"):
1417
      self.op.candidate_pool_size = None
1418
    if self.op.candidate_pool_size is not None:
1419
      try:
1420
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1421
      except ValueError, err:
1422
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1423
                                   str(err))
1424
      if self.op.candidate_pool_size < 1:
1425
        raise errors.OpPrereqError("At least one master candidate needed")
1426

    
1427
  def ExpandNames(self):
1428
    # FIXME: in the future maybe other cluster params won't require checking on
1429
    # all nodes to be modified.
1430
    self.needed_locks = {
1431
      locking.LEVEL_NODE: locking.ALL_SET,
1432
    }
1433
    self.share_locks[locking.LEVEL_NODE] = 1
1434

    
1435
  def BuildHooksEnv(self):
1436
    """Build hooks env.
1437

1438
    """
1439
    env = {
1440
      "OP_TARGET": self.cfg.GetClusterName(),
1441
      "NEW_VG_NAME": self.op.vg_name,
1442
      }
1443
    mn = self.cfg.GetMasterNode()
1444
    return env, [mn], [mn]
1445

    
1446
  def CheckPrereq(self):
1447
    """Check prerequisites.
1448

1449
    This checks whether the given params don't conflict and
1450
    if the given volume group is valid.
1451

1452
    """
1453
    if self.op.vg_name is not None and not self.op.vg_name:
1454
      instances = self.cfg.GetAllInstancesInfo().values()
1455
      for inst in instances:
1456
        for disk in inst.disks:
1457
          if _RecursiveCheckIfLVMBased(disk):
1458
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1459
                                       " lvm-based instances exist")
1460

    
1461
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1462

    
1463
    # if vg_name not None, checks given volume group on all nodes
1464
    if self.op.vg_name:
1465
      vglist = self.rpc.call_vg_list(node_list)
1466
      for node in node_list:
1467
        if vglist[node].failed:
1468
          # ignoring down node
1469
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1470
          continue
1471
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1472
                                              self.op.vg_name,
1473
                                              constants.MIN_VG_SIZE)
1474
        if vgstatus:
1475
          raise errors.OpPrereqError("Error on node '%s': %s" %
1476
                                     (node, vgstatus))
1477

    
1478
    self.cluster = cluster = self.cfg.GetClusterInfo()
1479
    # validate beparams changes
1480
    if self.op.beparams:
1481
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1482
      self.new_beparams = cluster.FillDict(
1483
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1484

    
1485
    # hypervisor list/parameters
1486
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1487
    if self.op.hvparams:
1488
      if not isinstance(self.op.hvparams, dict):
1489
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1490
      for hv_name, hv_dict in self.op.hvparams.items():
1491
        if hv_name not in self.new_hvparams:
1492
          self.new_hvparams[hv_name] = hv_dict
1493
        else:
1494
          self.new_hvparams[hv_name].update(hv_dict)
1495

    
1496
    if self.op.enabled_hypervisors is not None:
1497
      self.hv_list = self.op.enabled_hypervisors
1498
    else:
1499
      self.hv_list = cluster.enabled_hypervisors
1500

    
1501
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1502
      # either the enabled list has changed, or the parameters have, validate
1503
      for hv_name, hv_params in self.new_hvparams.items():
1504
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1505
            (self.op.enabled_hypervisors and
1506
             hv_name in self.op.enabled_hypervisors)):
1507
          # either this is a new hypervisor, or its parameters have changed
1508
          hv_class = hypervisor.GetHypervisor(hv_name)
1509
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1510
          hv_class.CheckParameterSyntax(hv_params)
1511
          _CheckHVParams(self, node_list, hv_name, hv_params)
1512

    
1513
  def Exec(self, feedback_fn):
1514
    """Change the parameters of the cluster.
1515

1516
    """
1517
    if self.op.vg_name is not None:
1518
      if self.op.vg_name != self.cfg.GetVGName():
1519
        self.cfg.SetVGName(self.op.vg_name)
1520
      else:
1521
        feedback_fn("Cluster LVM configuration already in desired"
1522
                    " state, not changing")
1523
    if self.op.hvparams:
1524
      self.cluster.hvparams = self.new_hvparams
1525
    if self.op.enabled_hypervisors is not None:
1526
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1527
    if self.op.beparams:
1528
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1529
    if self.op.candidate_pool_size is not None:
1530
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1531

    
1532
    self.cfg.Update(self.cluster)
1533

    
1534
    # we want to update nodes after the cluster so that if any errors
1535
    # happen, we have recorded and saved the cluster info
1536
    if self.op.candidate_pool_size is not None:
1537
      _AdjustCandidatePool(self)
1538

    
1539

    
1540
class LURedistributeConfig(NoHooksLU):
1541
  """Force the redistribution of cluster configuration.
1542

1543
  This is a very simple LU.
1544

1545
  """
1546
  _OP_REQP = []
1547
  REQ_BGL = False
1548

    
1549
  def ExpandNames(self):
1550
    self.needed_locks = {
1551
      locking.LEVEL_NODE: locking.ALL_SET,
1552
    }
1553
    self.share_locks[locking.LEVEL_NODE] = 1
1554

    
1555
  def CheckPrereq(self):
1556
    """Check prerequisites.
1557

1558
    """
1559

    
1560
  def Exec(self, feedback_fn):
1561
    """Redistribute the configuration.
1562

1563
    """
1564
    self.cfg.Update(self.cfg.GetClusterInfo())
1565

    
1566

    
1567
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1568
  """Sleep and poll for an instance's disk to sync.
1569

1570
  """
1571
  if not instance.disks:
1572
    return True
1573

    
1574
  if not oneshot:
1575
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1576

    
1577
  node = instance.primary_node
1578

    
1579
  for dev in instance.disks:
1580
    lu.cfg.SetDiskID(dev, node)
1581

    
1582
  retries = 0
1583
  while True:
1584
    max_time = 0
1585
    done = True
1586
    cumul_degraded = False
1587
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1588
    if rstats.failed or not rstats.data:
1589
      lu.LogWarning("Can't get any data from node %s", node)
1590
      retries += 1
1591
      if retries >= 10:
1592
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1593
                                 " aborting." % node)
1594
      time.sleep(6)
1595
      continue
1596
    rstats = rstats.data
1597
    retries = 0
1598
    for i, mstat in enumerate(rstats):
1599
      if mstat is None:
1600
        lu.LogWarning("Can't compute data for node %s/%s",
1601
                           node, instance.disks[i].iv_name)
1602
        continue
1603
      # we ignore the ldisk parameter
1604
      perc_done, est_time, is_degraded, _ = mstat
1605
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1606
      if perc_done is not None:
1607
        done = False
1608
        if est_time is not None:
1609
          rem_time = "%d estimated seconds remaining" % est_time
1610
          max_time = est_time
1611
        else:
1612
          rem_time = "no time estimate"
1613
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1614
                        (instance.disks[i].iv_name, perc_done, rem_time))
1615
    if done or oneshot:
1616
      break
1617

    
1618
    time.sleep(min(60, max_time))
1619

    
1620
  if done:
1621
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1622
  return not cumul_degraded
1623

    
1624

    
1625
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1626
  """Check that mirrors are not degraded.
1627

1628
  The ldisk parameter, if True, will change the test from the
1629
  is_degraded attribute (which represents overall non-ok status for
1630
  the device(s)) to the ldisk (representing the local storage status).
1631

1632
  """
1633
  lu.cfg.SetDiskID(dev, node)
1634
  if ldisk:
1635
    idx = 6
1636
  else:
1637
    idx = 5
1638

    
1639
  result = True
1640
  if on_primary or dev.AssembleOnSecondary():
1641
    rstats = lu.rpc.call_blockdev_find(node, dev)
1642
    msg = rstats.RemoteFailMsg()
1643
    if msg:
1644
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1645
      result = False
1646
    elif not rstats.payload:
1647
      lu.LogWarning("Can't find disk on node %s", node)
1648
      result = False
1649
    else:
1650
      result = result and (not rstats.payload[idx])
1651
  if dev.children:
1652
    for child in dev.children:
1653
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1654

    
1655
  return result
1656

    
1657

    
1658
class LUDiagnoseOS(NoHooksLU):
1659
  """Logical unit for OS diagnose/query.
1660

1661
  """
1662
  _OP_REQP = ["output_fields", "names"]
1663
  REQ_BGL = False
1664
  _FIELDS_STATIC = utils.FieldSet()
1665
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1666

    
1667
  def ExpandNames(self):
1668
    if self.op.names:
1669
      raise errors.OpPrereqError("Selective OS query not supported")
1670

    
1671
    _CheckOutputFields(static=self._FIELDS_STATIC,
1672
                       dynamic=self._FIELDS_DYNAMIC,
1673
                       selected=self.op.output_fields)
1674

    
1675
    # Lock all nodes, in shared mode
1676
    self.needed_locks = {}
1677
    self.share_locks[locking.LEVEL_NODE] = 1
1678
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1679

    
1680
  def CheckPrereq(self):
1681
    """Check prerequisites.
1682

1683
    """
1684

    
1685
  @staticmethod
1686
  def _DiagnoseByOS(node_list, rlist):
1687
    """Remaps a per-node return list into an a per-os per-node dictionary
1688

1689
    @param node_list: a list with the names of all nodes
1690
    @param rlist: a map with node names as keys and OS objects as values
1691

1692
    @rtype: dict
1693
    @return: a dictionary with osnames as keys and as value another map, with
1694
        nodes as keys and list of OS objects as values, eg::
1695

1696
          {"debian-etch": {"node1": [<object>,...],
1697
                           "node2": [<object>,]}
1698
          }
1699

1700
    """
1701
    all_os = {}
1702
    for node_name, nr in rlist.iteritems():
1703
      if nr.failed or not nr.data:
1704
        continue
1705
      for os_obj in nr.data:
1706
        if os_obj.name not in all_os:
1707
          # build a list of nodes for this os containing empty lists
1708
          # for each node in node_list
1709
          all_os[os_obj.name] = {}
1710
          for nname in node_list:
1711
            all_os[os_obj.name][nname] = []
1712
        all_os[os_obj.name][node_name].append(os_obj)
1713
    return all_os
1714

    
1715
  def Exec(self, feedback_fn):
1716
    """Compute the list of OSes.
1717

1718
    """
1719
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1720
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1721
                   if node in node_list]
1722
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1723
    if node_data == False:
1724
      raise errors.OpExecError("Can't gather the list of OSes")
1725
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1726
    output = []
1727
    for os_name, os_data in pol.iteritems():
1728
      row = []
1729
      for field in self.op.output_fields:
1730
        if field == "name":
1731
          val = os_name
1732
        elif field == "valid":
1733
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1734
        elif field == "node_status":
1735
          val = {}
1736
          for node_name, nos_list in os_data.iteritems():
1737
            val[node_name] = [(v.status, v.path) for v in nos_list]
1738
        else:
1739
          raise errors.ParameterError(field)
1740
        row.append(val)
1741
      output.append(row)
1742

    
1743
    return output
1744

    
1745

    
1746
class LURemoveNode(LogicalUnit):
1747
  """Logical unit for removing a node.
1748

1749
  """
1750
  HPATH = "node-remove"
1751
  HTYPE = constants.HTYPE_NODE
1752
  _OP_REQP = ["node_name"]
1753

    
1754
  def BuildHooksEnv(self):
1755
    """Build hooks env.
1756

1757
    This doesn't run on the target node in the pre phase as a failed
1758
    node would then be impossible to remove.
1759

1760
    """
1761
    env = {
1762
      "OP_TARGET": self.op.node_name,
1763
      "NODE_NAME": self.op.node_name,
1764
      }
1765
    all_nodes = self.cfg.GetNodeList()
1766
    all_nodes.remove(self.op.node_name)
1767
    return env, all_nodes, all_nodes
1768

    
1769
  def CheckPrereq(self):
1770
    """Check prerequisites.
1771

1772
    This checks:
1773
     - the node exists in the configuration
1774
     - it does not have primary or secondary instances
1775
     - it's not the master
1776

1777
    Any errors are signalled by raising errors.OpPrereqError.
1778

1779
    """
1780
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1781
    if node is None:
1782
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1783

    
1784
    instance_list = self.cfg.GetInstanceList()
1785

    
1786
    masternode = self.cfg.GetMasterNode()
1787
    if node.name == masternode:
1788
      raise errors.OpPrereqError("Node is the master node,"
1789
                                 " you need to failover first.")
1790

    
1791
    for instance_name in instance_list:
1792
      instance = self.cfg.GetInstanceInfo(instance_name)
1793
      if node.name in instance.all_nodes:
1794
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1795
                                   " please remove first." % instance_name)
1796
    self.op.node_name = node.name
1797
    self.node = node
1798

    
1799
  def Exec(self, feedback_fn):
1800
    """Removes the node from the cluster.
1801

1802
    """
1803
    node = self.node
1804
    logging.info("Stopping the node daemon and removing configs from node %s",
1805
                 node.name)
1806

    
1807
    self.context.RemoveNode(node.name)
1808

    
1809
    self.rpc.call_node_leave_cluster(node.name)
1810

    
1811
    # Promote nodes to master candidate as needed
1812
    _AdjustCandidatePool(self)
1813

    
1814

    
1815
class LUQueryNodes(NoHooksLU):
1816
  """Logical unit for querying nodes.
1817

1818
  """
1819
  _OP_REQP = ["output_fields", "names", "use_locking"]
1820
  REQ_BGL = False
1821
  _FIELDS_DYNAMIC = utils.FieldSet(
1822
    "dtotal", "dfree",
1823
    "mtotal", "mnode", "mfree",
1824
    "bootid",
1825
    "ctotal", "cnodes", "csockets",
1826
    )
1827

    
1828
  _FIELDS_STATIC = utils.FieldSet(
1829
    "name", "pinst_cnt", "sinst_cnt",
1830
    "pinst_list", "sinst_list",
1831
    "pip", "sip", "tags",
1832
    "serial_no",
1833
    "master_candidate",
1834
    "master",
1835
    "offline",
1836
    "drained",
1837
    )
1838

    
1839
  def ExpandNames(self):
1840
    _CheckOutputFields(static=self._FIELDS_STATIC,
1841
                       dynamic=self._FIELDS_DYNAMIC,
1842
                       selected=self.op.output_fields)
1843

    
1844
    self.needed_locks = {}
1845
    self.share_locks[locking.LEVEL_NODE] = 1
1846

    
1847
    if self.op.names:
1848
      self.wanted = _GetWantedNodes(self, self.op.names)
1849
    else:
1850
      self.wanted = locking.ALL_SET
1851

    
1852
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1853
    self.do_locking = self.do_node_query and self.op.use_locking
1854
    if self.do_locking:
1855
      # if we don't request only static fields, we need to lock the nodes
1856
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1857

    
1858

    
1859
  def CheckPrereq(self):
1860
    """Check prerequisites.
1861

1862
    """
1863
    # The validation of the node list is done in the _GetWantedNodes,
1864
    # if non empty, and if empty, there's no validation to do
1865
    pass
1866

    
1867
  def Exec(self, feedback_fn):
1868
    """Computes the list of nodes and their attributes.
1869

1870
    """
1871
    all_info = self.cfg.GetAllNodesInfo()
1872
    if self.do_locking:
1873
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1874
    elif self.wanted != locking.ALL_SET:
1875
      nodenames = self.wanted
1876
      missing = set(nodenames).difference(all_info.keys())
1877
      if missing:
1878
        raise errors.OpExecError(
1879
          "Some nodes were removed before retrieving their data: %s" % missing)
1880
    else:
1881
      nodenames = all_info.keys()
1882

    
1883
    nodenames = utils.NiceSort(nodenames)
1884
    nodelist = [all_info[name] for name in nodenames]
1885

    
1886
    # begin data gathering
1887

    
1888
    if self.do_node_query:
1889
      live_data = {}
1890
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1891
                                          self.cfg.GetHypervisorType())
1892
      for name in nodenames:
1893
        nodeinfo = node_data[name]
1894
        if not nodeinfo.failed and nodeinfo.data:
1895
          nodeinfo = nodeinfo.data
1896
          fn = utils.TryConvert
1897
          live_data[name] = {
1898
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1899
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1900
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1901
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1902
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1903
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1904
            "bootid": nodeinfo.get('bootid', None),
1905
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1906
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1907
            }
1908
        else:
1909
          live_data[name] = {}
1910
    else:
1911
      live_data = dict.fromkeys(nodenames, {})
1912

    
1913
    node_to_primary = dict([(name, set()) for name in nodenames])
1914
    node_to_secondary = dict([(name, set()) for name in nodenames])
1915

    
1916
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1917
                             "sinst_cnt", "sinst_list"))
1918
    if inst_fields & frozenset(self.op.output_fields):
1919
      instancelist = self.cfg.GetInstanceList()
1920

    
1921
      for instance_name in instancelist:
1922
        inst = self.cfg.GetInstanceInfo(instance_name)
1923
        if inst.primary_node in node_to_primary:
1924
          node_to_primary[inst.primary_node].add(inst.name)
1925
        for secnode in inst.secondary_nodes:
1926
          if secnode in node_to_secondary:
1927
            node_to_secondary[secnode].add(inst.name)
1928

    
1929
    master_node = self.cfg.GetMasterNode()
1930

    
1931
    # end data gathering
1932

    
1933
    output = []
1934
    for node in nodelist:
1935
      node_output = []
1936
      for field in self.op.output_fields:
1937
        if field == "name":
1938
          val = node.name
1939
        elif field == "pinst_list":
1940
          val = list(node_to_primary[node.name])
1941
        elif field == "sinst_list":
1942
          val = list(node_to_secondary[node.name])
1943
        elif field == "pinst_cnt":
1944
          val = len(node_to_primary[node.name])
1945
        elif field == "sinst_cnt":
1946
          val = len(node_to_secondary[node.name])
1947
        elif field == "pip":
1948
          val = node.primary_ip
1949
        elif field == "sip":
1950
          val = node.secondary_ip
1951
        elif field == "tags":
1952
          val = list(node.GetTags())
1953
        elif field == "serial_no":
1954
          val = node.serial_no
1955
        elif field == "master_candidate":
1956
          val = node.master_candidate
1957
        elif field == "master":
1958
          val = node.name == master_node
1959
        elif field == "offline":
1960
          val = node.offline
1961
        elif field == "drained":
1962
          val = node.drained
1963
        elif self._FIELDS_DYNAMIC.Matches(field):
1964
          val = live_data[node.name].get(field, None)
1965
        else:
1966
          raise errors.ParameterError(field)
1967
        node_output.append(val)
1968
      output.append(node_output)
1969

    
1970
    return output
1971

    
1972

    
1973
class LUQueryNodeVolumes(NoHooksLU):
1974
  """Logical unit for getting volumes on node(s).
1975

1976
  """
1977
  _OP_REQP = ["nodes", "output_fields"]
1978
  REQ_BGL = False
1979
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1980
  _FIELDS_STATIC = utils.FieldSet("node")
1981

    
1982
  def ExpandNames(self):
1983
    _CheckOutputFields(static=self._FIELDS_STATIC,
1984
                       dynamic=self._FIELDS_DYNAMIC,
1985
                       selected=self.op.output_fields)
1986

    
1987
    self.needed_locks = {}
1988
    self.share_locks[locking.LEVEL_NODE] = 1
1989
    if not self.op.nodes:
1990
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1991
    else:
1992
      self.needed_locks[locking.LEVEL_NODE] = \
1993
        _GetWantedNodes(self, self.op.nodes)
1994

    
1995
  def CheckPrereq(self):
1996
    """Check prerequisites.
1997

1998
    This checks that the fields required are valid output fields.
1999

2000
    """
2001
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2002

    
2003
  def Exec(self, feedback_fn):
2004
    """Computes the list of nodes and their attributes.
2005

2006
    """
2007
    nodenames = self.nodes
2008
    volumes = self.rpc.call_node_volumes(nodenames)
2009

    
2010
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2011
             in self.cfg.GetInstanceList()]
2012

    
2013
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2014

    
2015
    output = []
2016
    for node in nodenames:
2017
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2018
        continue
2019

    
2020
      node_vols = volumes[node].data[:]
2021
      node_vols.sort(key=lambda vol: vol['dev'])
2022

    
2023
      for vol in node_vols:
2024
        node_output = []
2025
        for field in self.op.output_fields:
2026
          if field == "node":
2027
            val = node
2028
          elif field == "phys":
2029
            val = vol['dev']
2030
          elif field == "vg":
2031
            val = vol['vg']
2032
          elif field == "name":
2033
            val = vol['name']
2034
          elif field == "size":
2035
            val = int(float(vol['size']))
2036
          elif field == "instance":
2037
            for inst in ilist:
2038
              if node not in lv_by_node[inst]:
2039
                continue
2040
              if vol['name'] in lv_by_node[inst][node]:
2041
                val = inst.name
2042
                break
2043
            else:
2044
              val = '-'
2045
          else:
2046
            raise errors.ParameterError(field)
2047
          node_output.append(str(val))
2048

    
2049
        output.append(node_output)
2050

    
2051
    return output
2052

    
2053

    
2054
class LUAddNode(LogicalUnit):
2055
  """Logical unit for adding node to the cluster.
2056

2057
  """
2058
  HPATH = "node-add"
2059
  HTYPE = constants.HTYPE_NODE
2060
  _OP_REQP = ["node_name"]
2061

    
2062
  def BuildHooksEnv(self):
2063
    """Build hooks env.
2064

2065
    This will run on all nodes before, and on all nodes + the new node after.
2066

2067
    """
2068
    env = {
2069
      "OP_TARGET": self.op.node_name,
2070
      "NODE_NAME": self.op.node_name,
2071
      "NODE_PIP": self.op.primary_ip,
2072
      "NODE_SIP": self.op.secondary_ip,
2073
      }
2074
    nodes_0 = self.cfg.GetNodeList()
2075
    nodes_1 = nodes_0 + [self.op.node_name, ]
2076
    return env, nodes_0, nodes_1
2077

    
2078
  def CheckPrereq(self):
2079
    """Check prerequisites.
2080

2081
    This checks:
2082
     - the new node is not already in the config
2083
     - it is resolvable
2084
     - its parameters (single/dual homed) matches the cluster
2085

2086
    Any errors are signalled by raising errors.OpPrereqError.
2087

2088
    """
2089
    node_name = self.op.node_name
2090
    cfg = self.cfg
2091

    
2092
    dns_data = utils.HostInfo(node_name)
2093

    
2094
    node = dns_data.name
2095
    primary_ip = self.op.primary_ip = dns_data.ip
2096
    secondary_ip = getattr(self.op, "secondary_ip", None)
2097
    if secondary_ip is None:
2098
      secondary_ip = primary_ip
2099
    if not utils.IsValidIP(secondary_ip):
2100
      raise errors.OpPrereqError("Invalid secondary IP given")
2101
    self.op.secondary_ip = secondary_ip
2102

    
2103
    node_list = cfg.GetNodeList()
2104
    if not self.op.readd and node in node_list:
2105
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2106
                                 node)
2107
    elif self.op.readd and node not in node_list:
2108
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2109

    
2110
    for existing_node_name in node_list:
2111
      existing_node = cfg.GetNodeInfo(existing_node_name)
2112

    
2113
      if self.op.readd and node == existing_node_name:
2114
        if (existing_node.primary_ip != primary_ip or
2115
            existing_node.secondary_ip != secondary_ip):
2116
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2117
                                     " address configuration as before")
2118
        continue
2119

    
2120
      if (existing_node.primary_ip == primary_ip or
2121
          existing_node.secondary_ip == primary_ip or
2122
          existing_node.primary_ip == secondary_ip or
2123
          existing_node.secondary_ip == secondary_ip):
2124
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2125
                                   " existing node %s" % existing_node.name)
2126

    
2127
    # check that the type of the node (single versus dual homed) is the
2128
    # same as for the master
2129
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2130
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2131
    newbie_singlehomed = secondary_ip == primary_ip
2132
    if master_singlehomed != newbie_singlehomed:
2133
      if master_singlehomed:
2134
        raise errors.OpPrereqError("The master has no private ip but the"
2135
                                   " new node has one")
2136
      else:
2137
        raise errors.OpPrereqError("The master has a private ip but the"
2138
                                   " new node doesn't have one")
2139

    
2140
    # checks reachablity
2141
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2142
      raise errors.OpPrereqError("Node not reachable by ping")
2143

    
2144
    if not newbie_singlehomed:
2145
      # check reachability from my secondary ip to newbie's secondary ip
2146
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2147
                           source=myself.secondary_ip):
2148
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2149
                                   " based ping to noded port")
2150

    
2151
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2152
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2153
    master_candidate = mc_now < cp_size
2154

    
2155
    self.new_node = objects.Node(name=node,
2156
                                 primary_ip=primary_ip,
2157
                                 secondary_ip=secondary_ip,
2158
                                 master_candidate=master_candidate,
2159
                                 offline=False, drained=False)
2160

    
2161
  def Exec(self, feedback_fn):
2162
    """Adds the new node to the cluster.
2163

2164
    """
2165
    new_node = self.new_node
2166
    node = new_node.name
2167

    
2168
    # check connectivity
2169
    result = self.rpc.call_version([node])[node]
2170
    result.Raise()
2171
    if result.data:
2172
      if constants.PROTOCOL_VERSION == result.data:
2173
        logging.info("Communication to node %s fine, sw version %s match",
2174
                     node, result.data)
2175
      else:
2176
        raise errors.OpExecError("Version mismatch master version %s,"
2177
                                 " node version %s" %
2178
                                 (constants.PROTOCOL_VERSION, result.data))
2179
    else:
2180
      raise errors.OpExecError("Cannot get version from the new node")
2181

    
2182
    # setup ssh on node
2183
    logging.info("Copy ssh key to node %s", node)
2184
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2185
    keyarray = []
2186
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2187
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2188
                priv_key, pub_key]
2189

    
2190
    for i in keyfiles:
2191
      f = open(i, 'r')
2192
      try:
2193
        keyarray.append(f.read())
2194
      finally:
2195
        f.close()
2196

    
2197
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2198
                                    keyarray[2],
2199
                                    keyarray[3], keyarray[4], keyarray[5])
2200

    
2201
    msg = result.RemoteFailMsg()
2202
    if msg:
2203
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2204
                               " new node: %s" % msg)
2205

    
2206
    # Add node to our /etc/hosts, and add key to known_hosts
2207
    utils.AddHostToEtcHosts(new_node.name)
2208

    
2209
    if new_node.secondary_ip != new_node.primary_ip:
2210
      result = self.rpc.call_node_has_ip_address(new_node.name,
2211
                                                 new_node.secondary_ip)
2212
      if result.failed or not result.data:
2213
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2214
                                 " you gave (%s). Please fix and re-run this"
2215
                                 " command." % new_node.secondary_ip)
2216

    
2217
    node_verify_list = [self.cfg.GetMasterNode()]
2218
    node_verify_param = {
2219
      'nodelist': [node],
2220
      # TODO: do a node-net-test as well?
2221
    }
2222

    
2223
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2224
                                       self.cfg.GetClusterName())
2225
    for verifier in node_verify_list:
2226
      if result[verifier].failed or not result[verifier].data:
2227
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2228
                                 " for remote verification" % verifier)
2229
      if result[verifier].data['nodelist']:
2230
        for failed in result[verifier].data['nodelist']:
2231
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2232
                      (verifier, result[verifier].data['nodelist'][failed]))
2233
        raise errors.OpExecError("ssh/hostname verification failed.")
2234

    
2235
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2236
    # including the node just added
2237
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2238
    dist_nodes = self.cfg.GetNodeList()
2239
    if not self.op.readd:
2240
      dist_nodes.append(node)
2241
    if myself.name in dist_nodes:
2242
      dist_nodes.remove(myself.name)
2243

    
2244
    logging.debug("Copying hosts and known_hosts to all nodes")
2245
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2246
      result = self.rpc.call_upload_file(dist_nodes, fname)
2247
      for to_node, to_result in result.iteritems():
2248
        if to_result.failed or not to_result.data:
2249
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2250

    
2251
    to_copy = []
2252
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2253
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2254
      to_copy.append(constants.VNC_PASSWORD_FILE)
2255

    
2256
    for fname in to_copy:
2257
      result = self.rpc.call_upload_file([node], fname)
2258
      if result[node].failed or not result[node]:
2259
        logging.error("Could not copy file %s to node %s", fname, node)
2260

    
2261
    if self.op.readd:
2262
      self.context.ReaddNode(new_node)
2263
    else:
2264
      self.context.AddNode(new_node)
2265

    
2266

    
2267
class LUSetNodeParams(LogicalUnit):
2268
  """Modifies the parameters of a node.
2269

2270
  """
2271
  HPATH = "node-modify"
2272
  HTYPE = constants.HTYPE_NODE
2273
  _OP_REQP = ["node_name"]
2274
  REQ_BGL = False
2275

    
2276
  def CheckArguments(self):
2277
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2278
    if node_name is None:
2279
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2280
    self.op.node_name = node_name
2281
    _CheckBooleanOpField(self.op, 'master_candidate')
2282
    _CheckBooleanOpField(self.op, 'offline')
2283
    _CheckBooleanOpField(self.op, 'drained')
2284
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2285
    if all_mods.count(None) == 3:
2286
      raise errors.OpPrereqError("Please pass at least one modification")
2287
    if all_mods.count(True) > 1:
2288
      raise errors.OpPrereqError("Can't set the node into more than one"
2289
                                 " state at the same time")
2290

    
2291
  def ExpandNames(self):
2292
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2293

    
2294
  def BuildHooksEnv(self):
2295
    """Build hooks env.
2296

2297
    This runs on the master node.
2298

2299
    """
2300
    env = {
2301
      "OP_TARGET": self.op.node_name,
2302
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2303
      "OFFLINE": str(self.op.offline),
2304
      "DRAINED": str(self.op.drained),
2305
      }
2306
    nl = [self.cfg.GetMasterNode(),
2307
          self.op.node_name]
2308
    return env, nl, nl
2309

    
2310
  def CheckPrereq(self):
2311
    """Check prerequisites.
2312

2313
    This only checks the instance list against the existing names.
2314

2315
    """
2316
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2317

    
2318
    if ((self.op.master_candidate == False or self.op.offline == True or
2319
         self.op.drained == True) and node.master_candidate):
2320
      # we will demote the node from master_candidate
2321
      if self.op.node_name == self.cfg.GetMasterNode():
2322
        raise errors.OpPrereqError("The master node has to be a"
2323
                                   " master candidate, online and not drained")
2324
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2325
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2326
      if num_candidates <= cp_size:
2327
        msg = ("Not enough master candidates (desired"
2328
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2329
        if self.op.force:
2330
          self.LogWarning(msg)
2331
        else:
2332
          raise errors.OpPrereqError(msg)
2333

    
2334
    if (self.op.master_candidate == True and
2335
        ((node.offline and not self.op.offline == False) or
2336
         (node.drained and not self.op.drained == False))):
2337
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2338
                                 " to master_candidate" % node.name)
2339

    
2340
    return
2341

    
2342
  def Exec(self, feedback_fn):
2343
    """Modifies a node.
2344

2345
    """
2346
    node = self.node
2347

    
2348
    result = []
2349
    changed_mc = False
2350

    
2351
    if self.op.offline is not None:
2352
      node.offline = self.op.offline
2353
      result.append(("offline", str(self.op.offline)))
2354
      if self.op.offline == True:
2355
        if node.master_candidate:
2356
          node.master_candidate = False
2357
          changed_mc = True
2358
          result.append(("master_candidate", "auto-demotion due to offline"))
2359
        if node.drained:
2360
          node.drained = False
2361
          result.append(("drained", "clear drained status due to offline"))
2362

    
2363
    if self.op.master_candidate is not None:
2364
      node.master_candidate = self.op.master_candidate
2365
      changed_mc = True
2366
      result.append(("master_candidate", str(self.op.master_candidate)))
2367
      if self.op.master_candidate == False:
2368
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2369
        msg = rrc.RemoteFailMsg()
2370
        if msg:
2371
          self.LogWarning("Node failed to demote itself: %s" % msg)
2372

    
2373
    if self.op.drained is not None:
2374
      node.drained = self.op.drained
2375
      result.append(("drained", str(self.op.drained)))
2376
      if self.op.drained == True:
2377
        if node.master_candidate:
2378
          node.master_candidate = False
2379
          changed_mc = True
2380
          result.append(("master_candidate", "auto-demotion due to drain"))
2381
        if node.offline:
2382
          node.offline = False
2383
          result.append(("offline", "clear offline status due to drain"))
2384

    
2385
    # this will trigger configuration file update, if needed
2386
    self.cfg.Update(node)
2387
    # this will trigger job queue propagation or cleanup
2388
    if changed_mc:
2389
      self.context.ReaddNode(node)
2390

    
2391
    return result
2392

    
2393

    
2394
class LUQueryClusterInfo(NoHooksLU):
2395
  """Query cluster configuration.
2396

2397
  """
2398
  _OP_REQP = []
2399
  REQ_BGL = False
2400

    
2401
  def ExpandNames(self):
2402
    self.needed_locks = {}
2403

    
2404
  def CheckPrereq(self):
2405
    """No prerequsites needed for this LU.
2406

2407
    """
2408
    pass
2409

    
2410
  def Exec(self, feedback_fn):
2411
    """Return cluster config.
2412

2413
    """
2414
    cluster = self.cfg.GetClusterInfo()
2415
    result = {
2416
      "software_version": constants.RELEASE_VERSION,
2417
      "protocol_version": constants.PROTOCOL_VERSION,
2418
      "config_version": constants.CONFIG_VERSION,
2419
      "os_api_version": constants.OS_API_VERSION,
2420
      "export_version": constants.EXPORT_VERSION,
2421
      "architecture": (platform.architecture()[0], platform.machine()),
2422
      "name": cluster.cluster_name,
2423
      "master": cluster.master_node,
2424
      "default_hypervisor": cluster.default_hypervisor,
2425
      "enabled_hypervisors": cluster.enabled_hypervisors,
2426
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2427
                        for hypervisor in cluster.enabled_hypervisors]),
2428
      "beparams": cluster.beparams,
2429
      "candidate_pool_size": cluster.candidate_pool_size,
2430
      }
2431

    
2432
    return result
2433

    
2434

    
2435
class LUQueryConfigValues(NoHooksLU):
2436
  """Return configuration values.
2437

2438
  """
2439
  _OP_REQP = []
2440
  REQ_BGL = False
2441
  _FIELDS_DYNAMIC = utils.FieldSet()
2442
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2443

    
2444
  def ExpandNames(self):
2445
    self.needed_locks = {}
2446

    
2447
    _CheckOutputFields(static=self._FIELDS_STATIC,
2448
                       dynamic=self._FIELDS_DYNAMIC,
2449
                       selected=self.op.output_fields)
2450

    
2451
  def CheckPrereq(self):
2452
    """No prerequisites.
2453

2454
    """
2455
    pass
2456

    
2457
  def Exec(self, feedback_fn):
2458
    """Dump a representation of the cluster config to the standard output.
2459

2460
    """
2461
    values = []
2462
    for field in self.op.output_fields:
2463
      if field == "cluster_name":
2464
        entry = self.cfg.GetClusterName()
2465
      elif field == "master_node":
2466
        entry = self.cfg.GetMasterNode()
2467
      elif field == "drain_flag":
2468
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2469
      else:
2470
        raise errors.ParameterError(field)
2471
      values.append(entry)
2472
    return values
2473

    
2474

    
2475
class LUActivateInstanceDisks(NoHooksLU):
2476
  """Bring up an instance's disks.
2477

2478
  """
2479
  _OP_REQP = ["instance_name"]
2480
  REQ_BGL = False
2481

    
2482
  def ExpandNames(self):
2483
    self._ExpandAndLockInstance()
2484
    self.needed_locks[locking.LEVEL_NODE] = []
2485
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2486

    
2487
  def DeclareLocks(self, level):
2488
    if level == locking.LEVEL_NODE:
2489
      self._LockInstancesNodes()
2490

    
2491
  def CheckPrereq(self):
2492
    """Check prerequisites.
2493

2494
    This checks that the instance is in the cluster.
2495

2496
    """
2497
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2498
    assert self.instance is not None, \
2499
      "Cannot retrieve locked instance %s" % self.op.instance_name
2500
    _CheckNodeOnline(self, self.instance.primary_node)
2501

    
2502
  def Exec(self, feedback_fn):
2503
    """Activate the disks.
2504

2505
    """
2506
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2507
    if not disks_ok:
2508
      raise errors.OpExecError("Cannot activate block devices")
2509

    
2510
    return disks_info
2511

    
2512

    
2513
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2514
  """Prepare the block devices for an instance.
2515

2516
  This sets up the block devices on all nodes.
2517

2518
  @type lu: L{LogicalUnit}
2519
  @param lu: the logical unit on whose behalf we execute
2520
  @type instance: L{objects.Instance}
2521
  @param instance: the instance for whose disks we assemble
2522
  @type ignore_secondaries: boolean
2523
  @param ignore_secondaries: if true, errors on secondary nodes
2524
      won't result in an error return from the function
2525
  @return: False if the operation failed, otherwise a list of
2526
      (host, instance_visible_name, node_visible_name)
2527
      with the mapping from node devices to instance devices
2528

2529
  """
2530
  device_info = []
2531
  disks_ok = True
2532
  iname = instance.name
2533
  # With the two passes mechanism we try to reduce the window of
2534
  # opportunity for the race condition of switching DRBD to primary
2535
  # before handshaking occured, but we do not eliminate it
2536

    
2537
  # The proper fix would be to wait (with some limits) until the
2538
  # connection has been made and drbd transitions from WFConnection
2539
  # into any other network-connected state (Connected, SyncTarget,
2540
  # SyncSource, etc.)
2541

    
2542
  # 1st pass, assemble on all nodes in secondary mode
2543
  for inst_disk in instance.disks:
2544
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2545
      lu.cfg.SetDiskID(node_disk, node)
2546
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2547
      msg = result.RemoteFailMsg()
2548
      if msg:
2549
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2550
                           " (is_primary=False, pass=1): %s",
2551
                           inst_disk.iv_name, node, msg)
2552
        if not ignore_secondaries:
2553
          disks_ok = False
2554

    
2555
  # FIXME: race condition on drbd migration to primary
2556

    
2557
  # 2nd pass, do only the primary node
2558
  for inst_disk in instance.disks:
2559
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2560
      if node != instance.primary_node:
2561
        continue
2562
      lu.cfg.SetDiskID(node_disk, node)
2563
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2564
      msg = result.RemoteFailMsg()
2565
      if msg:
2566
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2567
                           " (is_primary=True, pass=2): %s",
2568
                           inst_disk.iv_name, node, msg)
2569
        disks_ok = False
2570
    device_info.append((instance.primary_node, inst_disk.iv_name,
2571
                        result.payload))
2572

    
2573
  # leave the disks configured for the primary node
2574
  # this is a workaround that would be fixed better by
2575
  # improving the logical/physical id handling
2576
  for disk in instance.disks:
2577
    lu.cfg.SetDiskID(disk, instance.primary_node)
2578

    
2579
  return disks_ok, device_info
2580

    
2581

    
2582
def _StartInstanceDisks(lu, instance, force):
2583
  """Start the disks of an instance.
2584

2585
  """
2586
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2587
                                           ignore_secondaries=force)
2588
  if not disks_ok:
2589
    _ShutdownInstanceDisks(lu, instance)
2590
    if force is not None and not force:
2591
      lu.proc.LogWarning("", hint="If the message above refers to a"
2592
                         " secondary node,"
2593
                         " you can retry the operation using '--force'.")
2594
    raise errors.OpExecError("Disk consistency error")
2595

    
2596

    
2597
class LUDeactivateInstanceDisks(NoHooksLU):
2598
  """Shutdown an instance's disks.
2599

2600
  """
2601
  _OP_REQP = ["instance_name"]
2602
  REQ_BGL = False
2603

    
2604
  def ExpandNames(self):
2605
    self._ExpandAndLockInstance()
2606
    self.needed_locks[locking.LEVEL_NODE] = []
2607
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2608

    
2609
  def DeclareLocks(self, level):
2610
    if level == locking.LEVEL_NODE:
2611
      self._LockInstancesNodes()
2612

    
2613
  def CheckPrereq(self):
2614
    """Check prerequisites.
2615

2616
    This checks that the instance is in the cluster.
2617

2618
    """
2619
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2620
    assert self.instance is not None, \
2621
      "Cannot retrieve locked instance %s" % self.op.instance_name
2622

    
2623
  def Exec(self, feedback_fn):
2624
    """Deactivate the disks
2625

2626
    """
2627
    instance = self.instance
2628
    _SafeShutdownInstanceDisks(self, instance)
2629

    
2630

    
2631
def _SafeShutdownInstanceDisks(lu, instance):
2632
  """Shutdown block devices of an instance.
2633

2634
  This function checks if an instance is running, before calling
2635
  _ShutdownInstanceDisks.
2636

2637
  """
2638
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2639
                                      [instance.hypervisor])
2640
  ins_l = ins_l[instance.primary_node]
2641
  if ins_l.failed or not isinstance(ins_l.data, list):
2642
    raise errors.OpExecError("Can't contact node '%s'" %
2643
                             instance.primary_node)
2644

    
2645
  if instance.name in ins_l.data:
2646
    raise errors.OpExecError("Instance is running, can't shutdown"
2647
                             " block devices.")
2648

    
2649
  _ShutdownInstanceDisks(lu, instance)
2650

    
2651

    
2652
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2653
  """Shutdown block devices of an instance.
2654

2655
  This does the shutdown on all nodes of the instance.
2656

2657
  If the ignore_primary is false, errors on the primary node are
2658
  ignored.
2659

2660
  """
2661
  all_result = True
2662
  for disk in instance.disks:
2663
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2664
      lu.cfg.SetDiskID(top_disk, node)
2665
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2666
      msg = result.RemoteFailMsg()
2667
      if msg:
2668
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2669
                      disk.iv_name, node, msg)
2670
        if not ignore_primary or node != instance.primary_node:
2671
          all_result = False
2672
  return all_result
2673

    
2674

    
2675
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2676
  """Checks if a node has enough free memory.
2677

2678
  This function check if a given node has the needed amount of free
2679
  memory. In case the node has less memory or we cannot get the
2680
  information from the node, this function raise an OpPrereqError
2681
  exception.
2682

2683
  @type lu: C{LogicalUnit}
2684
  @param lu: a logical unit from which we get configuration data
2685
  @type node: C{str}
2686
  @param node: the node to check
2687
  @type reason: C{str}
2688
  @param reason: string to use in the error message
2689
  @type requested: C{int}
2690
  @param requested: the amount of memory in MiB to check for
2691
  @type hypervisor_name: C{str}
2692
  @param hypervisor_name: the hypervisor to ask for memory stats
2693
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2694
      we cannot check the node
2695

2696
  """
2697
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2698
  nodeinfo[node].Raise()
2699
  free_mem = nodeinfo[node].data.get('memory_free')
2700
  if not isinstance(free_mem, int):
2701
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2702
                             " was '%s'" % (node, free_mem))
2703
  if requested > free_mem:
2704
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2705
                             " needed %s MiB, available %s MiB" %
2706
                             (node, reason, requested, free_mem))
2707

    
2708

    
2709
class LUStartupInstance(LogicalUnit):
2710
  """Starts an instance.
2711

2712
  """
2713
  HPATH = "instance-start"
2714
  HTYPE = constants.HTYPE_INSTANCE
2715
  _OP_REQP = ["instance_name", "force"]
2716
  REQ_BGL = False
2717

    
2718
  def ExpandNames(self):
2719
    self._ExpandAndLockInstance()
2720

    
2721
  def BuildHooksEnv(self):
2722
    """Build hooks env.
2723

2724
    This runs on master, primary and secondary nodes of the instance.
2725

2726
    """
2727
    env = {
2728
      "FORCE": self.op.force,
2729
      }
2730
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2731
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2732
    return env, nl, nl
2733

    
2734
  def CheckPrereq(self):
2735
    """Check prerequisites.
2736

2737
    This checks that the instance is in the cluster.
2738

2739
    """
2740
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2741
    assert self.instance is not None, \
2742
      "Cannot retrieve locked instance %s" % self.op.instance_name
2743

    
2744
    _CheckNodeOnline(self, instance.primary_node)
2745

    
2746
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2747
    # check bridges existance
2748
    _CheckInstanceBridgesExist(self, instance)
2749

    
2750
    _CheckNodeFreeMemory(self, instance.primary_node,
2751
                         "starting instance %s" % instance.name,
2752
                         bep[constants.BE_MEMORY], instance.hypervisor)
2753

    
2754
  def Exec(self, feedback_fn):
2755
    """Start the instance.
2756

2757
    """
2758
    instance = self.instance
2759
    force = self.op.force
2760

    
2761
    self.cfg.MarkInstanceUp(instance.name)
2762

    
2763
    node_current = instance.primary_node
2764

    
2765
    _StartInstanceDisks(self, instance, force)
2766

    
2767
    result = self.rpc.call_instance_start(node_current, instance)
2768
    msg = result.RemoteFailMsg()
2769
    if msg:
2770
      _ShutdownInstanceDisks(self, instance)
2771
      raise errors.OpExecError("Could not start instance: %s" % msg)
2772

    
2773

    
2774
class LURebootInstance(LogicalUnit):
2775
  """Reboot an instance.
2776

2777
  """
2778
  HPATH = "instance-reboot"
2779
  HTYPE = constants.HTYPE_INSTANCE
2780
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2781
  REQ_BGL = False
2782

    
2783
  def ExpandNames(self):
2784
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2785
                                   constants.INSTANCE_REBOOT_HARD,
2786
                                   constants.INSTANCE_REBOOT_FULL]:
2787
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2788
                                  (constants.INSTANCE_REBOOT_SOFT,
2789
                                   constants.INSTANCE_REBOOT_HARD,
2790
                                   constants.INSTANCE_REBOOT_FULL))
2791
    self._ExpandAndLockInstance()
2792

    
2793
  def BuildHooksEnv(self):
2794
    """Build hooks env.
2795

2796
    This runs on master, primary and secondary nodes of the instance.
2797

2798
    """
2799
    env = {
2800
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2801
      "REBOOT_TYPE": self.op.reboot_type,
2802
      }
2803
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2804
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2805
    return env, nl, nl
2806

    
2807
  def CheckPrereq(self):
2808
    """Check prerequisites.
2809

2810
    This checks that the instance is in the cluster.
2811

2812
    """
2813
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2814
    assert self.instance is not None, \
2815
      "Cannot retrieve locked instance %s" % self.op.instance_name
2816

    
2817
    _CheckNodeOnline(self, instance.primary_node)
2818

    
2819
    # check bridges existance
2820
    _CheckInstanceBridgesExist(self, instance)
2821

    
2822
  def Exec(self, feedback_fn):
2823
    """Reboot the instance.
2824

2825
    """
2826
    instance = self.instance
2827
    ignore_secondaries = self.op.ignore_secondaries
2828
    reboot_type = self.op.reboot_type
2829

    
2830
    node_current = instance.primary_node
2831

    
2832
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2833
                       constants.INSTANCE_REBOOT_HARD]:
2834
      for disk in instance.disks:
2835
        self.cfg.SetDiskID(disk, node_current)
2836
      result = self.rpc.call_instance_reboot(node_current, instance,
2837
                                             reboot_type)
2838
      msg = result.RemoteFailMsg()
2839
      if msg:
2840
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2841
    else:
2842
      result = self.rpc.call_instance_shutdown(node_current, instance)
2843
      msg = result.RemoteFailMsg()
2844
      if msg:
2845
        raise errors.OpExecError("Could not shutdown instance for"
2846
                                 " full reboot: %s" % msg)
2847
      _ShutdownInstanceDisks(self, instance)
2848
      _StartInstanceDisks(self, instance, ignore_secondaries)
2849
      result = self.rpc.call_instance_start(node_current, instance)
2850
      msg = result.RemoteFailMsg()
2851
      if msg:
2852
        _ShutdownInstanceDisks(self, instance)
2853
        raise errors.OpExecError("Could not start instance for"
2854
                                 " full reboot: %s" % msg)
2855

    
2856
    self.cfg.MarkInstanceUp(instance.name)
2857

    
2858

    
2859
class LUShutdownInstance(LogicalUnit):
2860
  """Shutdown an instance.
2861

2862
  """
2863
  HPATH = "instance-stop"
2864
  HTYPE = constants.HTYPE_INSTANCE
2865
  _OP_REQP = ["instance_name"]
2866
  REQ_BGL = False
2867

    
2868
  def ExpandNames(self):
2869
    self._ExpandAndLockInstance()
2870

    
2871
  def BuildHooksEnv(self):
2872
    """Build hooks env.
2873

2874
    This runs on master, primary and secondary nodes of the instance.
2875

2876
    """
2877
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2878
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2879
    return env, nl, nl
2880

    
2881
  def CheckPrereq(self):
2882
    """Check prerequisites.
2883

2884
    This checks that the instance is in the cluster.
2885

2886
    """
2887
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2888
    assert self.instance is not None, \
2889
      "Cannot retrieve locked instance %s" % self.op.instance_name
2890
    _CheckNodeOnline(self, self.instance.primary_node)
2891

    
2892
  def Exec(self, feedback_fn):
2893
    """Shutdown the instance.
2894

2895
    """
2896
    instance = self.instance
2897
    node_current = instance.primary_node
2898
    self.cfg.MarkInstanceDown(instance.name)
2899
    result = self.rpc.call_instance_shutdown(node_current, instance)
2900
    msg = result.RemoteFailMsg()
2901
    if msg:
2902
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2903

    
2904
    _ShutdownInstanceDisks(self, instance)
2905

    
2906

    
2907
class LUReinstallInstance(LogicalUnit):
2908
  """Reinstall an instance.
2909

2910
  """
2911
  HPATH = "instance-reinstall"
2912
  HTYPE = constants.HTYPE_INSTANCE
2913
  _OP_REQP = ["instance_name"]
2914
  REQ_BGL = False
2915

    
2916
  def ExpandNames(self):
2917
    self._ExpandAndLockInstance()
2918

    
2919
  def BuildHooksEnv(self):
2920
    """Build hooks env.
2921

2922
    This runs on master, primary and secondary nodes of the instance.
2923

2924
    """
2925
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2926
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2927
    return env, nl, nl
2928

    
2929
  def CheckPrereq(self):
2930
    """Check prerequisites.
2931

2932
    This checks that the instance is in the cluster and is not running.
2933

2934
    """
2935
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2936
    assert instance is not None, \
2937
      "Cannot retrieve locked instance %s" % self.op.instance_name
2938
    _CheckNodeOnline(self, instance.primary_node)
2939

    
2940
    if instance.disk_template == constants.DT_DISKLESS:
2941
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2942
                                 self.op.instance_name)
2943
    if instance.admin_up:
2944
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2945
                                 self.op.instance_name)
2946
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2947
                                              instance.name,
2948
                                              instance.hypervisor)
2949
    if remote_info.failed or remote_info.data:
2950
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2951
                                 (self.op.instance_name,
2952
                                  instance.primary_node))
2953

    
2954
    self.op.os_type = getattr(self.op, "os_type", None)
2955
    if self.op.os_type is not None:
2956
      # OS verification
2957
      pnode = self.cfg.GetNodeInfo(
2958
        self.cfg.ExpandNodeName(instance.primary_node))
2959
      if pnode is None:
2960
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2961
                                   self.op.pnode)
2962
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2963
      result.Raise()
2964
      if not isinstance(result.data, objects.OS):
2965
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2966
                                   " primary node"  % self.op.os_type)
2967

    
2968
    self.instance = instance
2969

    
2970
  def Exec(self, feedback_fn):
2971
    """Reinstall the instance.
2972

2973
    """
2974
    inst = self.instance
2975

    
2976
    if self.op.os_type is not None:
2977
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2978
      inst.os = self.op.os_type
2979
      self.cfg.Update(inst)
2980

    
2981
    _StartInstanceDisks(self, inst, None)
2982
    try:
2983
      feedback_fn("Running the instance OS create scripts...")
2984
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2985
      msg = result.RemoteFailMsg()
2986
      if msg:
2987
        raise errors.OpExecError("Could not install OS for instance %s"
2988
                                 " on node %s: %s" %
2989
                                 (inst.name, inst.primary_node, msg))
2990
    finally:
2991
      _ShutdownInstanceDisks(self, inst)
2992

    
2993

    
2994
class LURenameInstance(LogicalUnit):
2995
  """Rename an instance.
2996

2997
  """
2998
  HPATH = "instance-rename"
2999
  HTYPE = constants.HTYPE_INSTANCE
3000
  _OP_REQP = ["instance_name", "new_name"]
3001

    
3002
  def BuildHooksEnv(self):
3003
    """Build hooks env.
3004

3005
    This runs on master, primary and secondary nodes of the instance.
3006

3007
    """
3008
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3009
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3010
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3011
    return env, nl, nl
3012

    
3013
  def CheckPrereq(self):
3014
    """Check prerequisites.
3015

3016
    This checks that the instance is in the cluster and is not running.
3017

3018
    """
3019
    instance = self.cfg.GetInstanceInfo(
3020
      self.cfg.ExpandInstanceName(self.op.instance_name))
3021
    if instance is None:
3022
      raise errors.OpPrereqError("Instance '%s' not known" %
3023
                                 self.op.instance_name)
3024
    _CheckNodeOnline(self, instance.primary_node)
3025

    
3026
    if instance.admin_up:
3027
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3028
                                 self.op.instance_name)
3029
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3030
                                              instance.name,
3031
                                              instance.hypervisor)
3032
    remote_info.Raise()
3033
    if remote_info.data:
3034
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3035
                                 (self.op.instance_name,
3036
                                  instance.primary_node))
3037
    self.instance = instance
3038

    
3039
    # new name verification
3040
    name_info = utils.HostInfo(self.op.new_name)
3041

    
3042
    self.op.new_name = new_name = name_info.name
3043
    instance_list = self.cfg.GetInstanceList()
3044
    if new_name in instance_list:
3045
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3046
                                 new_name)
3047

    
3048
    if not getattr(self.op, "ignore_ip", False):
3049
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3050
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3051
                                   (name_info.ip, new_name))
3052

    
3053

    
3054
  def Exec(self, feedback_fn):
3055
    """Reinstall the instance.
3056

3057
    """
3058
    inst = self.instance
3059
    old_name = inst.name
3060

    
3061
    if inst.disk_template == constants.DT_FILE:
3062
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3063

    
3064
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3065
    # Change the instance lock. This is definitely safe while we hold the BGL
3066
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3067
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3068

    
3069
    # re-read the instance from the configuration after rename
3070
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3071

    
3072
    if inst.disk_template == constants.DT_FILE:
3073
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3074
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3075
                                                     old_file_storage_dir,
3076
                                                     new_file_storage_dir)
3077
      result.Raise()
3078
      if not result.data:
3079
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3080
                                 " directory '%s' to '%s' (but the instance"
3081
                                 " has been renamed in Ganeti)" % (
3082
                                 inst.primary_node, old_file_storage_dir,
3083
                                 new_file_storage_dir))
3084

    
3085
      if not result.data[0]:
3086
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3087
                                 " (but the instance has been renamed in"
3088
                                 " Ganeti)" % (old_file_storage_dir,
3089
                                               new_file_storage_dir))
3090

    
3091
    _StartInstanceDisks(self, inst, None)
3092
    try:
3093
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3094
                                                 old_name)
3095
      msg = result.RemoteFailMsg()
3096
      if msg:
3097
        msg = ("Could not run OS rename script for instance %s on node %s"
3098
               " (but the instance has been renamed in Ganeti): %s" %
3099
               (inst.name, inst.primary_node, msg))
3100
        self.proc.LogWarning(msg)
3101
    finally:
3102
      _ShutdownInstanceDisks(self, inst)
3103

    
3104

    
3105
class LURemoveInstance(LogicalUnit):
3106
  """Remove an instance.
3107

3108
  """
3109
  HPATH = "instance-remove"
3110
  HTYPE = constants.HTYPE_INSTANCE
3111
  _OP_REQP = ["instance_name", "ignore_failures"]
3112
  REQ_BGL = False
3113

    
3114
  def ExpandNames(self):
3115
    self._ExpandAndLockInstance()
3116
    self.needed_locks[locking.LEVEL_NODE] = []
3117
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3118

    
3119
  def DeclareLocks(self, level):
3120
    if level == locking.LEVEL_NODE:
3121
      self._LockInstancesNodes()
3122

    
3123
  def BuildHooksEnv(self):
3124
    """Build hooks env.
3125

3126
    This runs on master, primary and secondary nodes of the instance.
3127

3128
    """
3129
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3130
    nl = [self.cfg.GetMasterNode()]
3131
    return env, nl, nl
3132

    
3133
  def CheckPrereq(self):
3134
    """Check prerequisites.
3135

3136
    This checks that the instance is in the cluster.
3137

3138
    """
3139
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3140
    assert self.instance is not None, \
3141
      "Cannot retrieve locked instance %s" % self.op.instance_name
3142

    
3143
  def Exec(self, feedback_fn):
3144
    """Remove the instance.
3145

3146
    """
3147
    instance = self.instance
3148
    logging.info("Shutting down instance %s on node %s",
3149
                 instance.name, instance.primary_node)
3150

    
3151
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3152
    msg = result.RemoteFailMsg()
3153
    if msg:
3154
      if self.op.ignore_failures:
3155
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3156
      else:
3157
        raise errors.OpExecError("Could not shutdown instance %s on"
3158
                                 " node %s: %s" %
3159
                                 (instance.name, instance.primary_node, msg))
3160

    
3161
    logging.info("Removing block devices for instance %s", instance.name)
3162

    
3163
    if not _RemoveDisks(self, instance):
3164
      if self.op.ignore_failures:
3165
        feedback_fn("Warning: can't remove instance's disks")
3166
      else:
3167
        raise errors.OpExecError("Can't remove instance's disks")
3168

    
3169
    logging.info("Removing instance %s out of cluster config", instance.name)
3170

    
3171
    self.cfg.RemoveInstance(instance.name)
3172
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3173

    
3174

    
3175
class LUQueryInstances(NoHooksLU):
3176
  """Logical unit for querying instances.
3177

3178
  """
3179
  _OP_REQP = ["output_fields", "names", "use_locking"]
3180
  REQ_BGL = False
3181
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3182
                                    "admin_state",
3183
                                    "disk_template", "ip", "mac", "bridge",
3184
                                    "sda_size", "sdb_size", "vcpus", "tags",
3185
                                    "network_port", "beparams",
3186
                                    r"(disk)\.(size)/([0-9]+)",
3187
                                    r"(disk)\.(sizes)", "disk_usage",
3188
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3189
                                    r"(nic)\.(macs|ips|bridges)",
3190
                                    r"(disk|nic)\.(count)",
3191
                                    "serial_no", "hypervisor", "hvparams",] +
3192
                                  ["hv/%s" % name
3193
                                   for name in constants.HVS_PARAMETERS] +
3194
                                  ["be/%s" % name
3195
                                   for name in constants.BES_PARAMETERS])
3196
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3197

    
3198

    
3199
  def ExpandNames(self):
3200
    _CheckOutputFields(static=self._FIELDS_STATIC,
3201
                       dynamic=self._FIELDS_DYNAMIC,
3202
                       selected=self.op.output_fields)
3203

    
3204
    self.needed_locks = {}
3205
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3206
    self.share_locks[locking.LEVEL_NODE] = 1
3207

    
3208
    if self.op.names:
3209
      self.wanted = _GetWantedInstances(self, self.op.names)
3210
    else:
3211
      self.wanted = locking.ALL_SET
3212

    
3213
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3214
    self.do_locking = self.do_node_query and self.op.use_locking
3215
    if self.do_locking:
3216
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3217
      self.needed_locks[locking.LEVEL_NODE] = []
3218
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3219

    
3220
  def DeclareLocks(self, level):
3221
    if level == locking.LEVEL_NODE and self.do_locking:
3222
      self._LockInstancesNodes()
3223

    
3224
  def CheckPrereq(self):
3225
    """Check prerequisites.
3226

3227
    """
3228
    pass
3229

    
3230
  def Exec(self, feedback_fn):
3231
    """Computes the list of nodes and their attributes.
3232

3233
    """
3234
    all_info = self.cfg.GetAllInstancesInfo()
3235
    if self.wanted == locking.ALL_SET:
3236
      # caller didn't specify instance names, so ordering is not important
3237
      if self.do_locking:
3238
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3239
      else:
3240
        instance_names = all_info.keys()
3241
      instance_names = utils.NiceSort(instance_names)
3242
    else:
3243
      # caller did specify names, so we must keep the ordering
3244
      if self.do_locking:
3245
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3246
      else:
3247
        tgt_set = all_info.keys()
3248
      missing = set(self.wanted).difference(tgt_set)
3249
      if missing:
3250
        raise errors.OpExecError("Some instances were removed before"
3251
                                 " retrieving their data: %s" % missing)
3252
      instance_names = self.wanted
3253

    
3254
    instance_list = [all_info[iname] for iname in instance_names]
3255

    
3256
    # begin data gathering
3257

    
3258
    nodes = frozenset([inst.primary_node for inst in instance_list])
3259
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3260

    
3261
    bad_nodes = []
3262
    off_nodes = []
3263
    if self.do_node_query:
3264
      live_data = {}
3265
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3266
      for name in nodes:
3267
        result = node_data[name]
3268
        if result.offline:
3269
          # offline nodes will be in both lists
3270
          off_nodes.append(name)
3271
        if result.failed:
3272
          bad_nodes.append(name)
3273
        else:
3274
          if result.data:
3275
            live_data.update(result.data)
3276
            # else no instance is alive
3277
    else:
3278
      live_data = dict([(name, {}) for name in instance_names])
3279

    
3280
    # end data gathering
3281

    
3282
    HVPREFIX = "hv/"
3283
    BEPREFIX = "be/"
3284
    output = []
3285
    for instance in instance_list:
3286
      iout = []
3287
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3288
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3289
      for field in self.op.output_fields:
3290
        st_match = self._FIELDS_STATIC.Matches(field)
3291
        if field == "name":
3292
          val = instance.name
3293
        elif field == "os":
3294
          val = instance.os
3295
        elif field == "pnode":
3296
          val = instance.primary_node
3297
        elif field == "snodes":
3298
          val = list(instance.secondary_nodes)
3299
        elif field == "admin_state":
3300
          val = instance.admin_up
3301
        elif field == "oper_state":
3302
          if instance.primary_node in bad_nodes:
3303
            val = None
3304
          else:
3305
            val = bool(live_data.get(instance.name))
3306
        elif field == "status":
3307
          if instance.primary_node in off_nodes:
3308
            val = "ERROR_nodeoffline"
3309
          elif instance.primary_node in bad_nodes:
3310
            val = "ERROR_nodedown"
3311
          else:
3312
            running = bool(live_data.get(instance.name))
3313
            if running:
3314
              if instance.admin_up:
3315
                val = "running"
3316
              else:
3317
                val = "ERROR_up"
3318
            else:
3319
              if instance.admin_up:
3320
                val = "ERROR_down"
3321
              else:
3322
                val = "ADMIN_down"
3323
        elif field == "oper_ram":
3324
          if instance.primary_node in bad_nodes:
3325
            val = None
3326
          elif instance.name in live_data:
3327
            val = live_data[instance.name].get("memory", "?")
3328
          else:
3329
            val = "-"
3330
        elif field == "disk_template":
3331
          val = instance.disk_template
3332
        elif field == "ip":
3333
          val = instance.nics[0].ip
3334
        elif field == "bridge":
3335
          val = instance.nics[0].bridge
3336
        elif field == "mac":
3337
          val = instance.nics[0].mac
3338
        elif field == "sda_size" or field == "sdb_size":
3339
          idx = ord(field[2]) - ord('a')
3340
          try:
3341
            val = instance.FindDisk(idx).size
3342
          except errors.OpPrereqError:
3343
            val = None
3344
        elif field == "disk_usage": # total disk usage per node
3345
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3346
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3347
        elif field == "tags":
3348
          val = list(instance.GetTags())
3349
        elif field == "serial_no":
3350
          val = instance.serial_no
3351
        elif field == "network_port":
3352
          val = instance.network_port
3353
        elif field == "hypervisor":
3354
          val = instance.hypervisor
3355
        elif field == "hvparams":
3356
          val = i_hv
3357
        elif (field.startswith(HVPREFIX) and
3358
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3359
          val = i_hv.get(field[len(HVPREFIX):], None)
3360
        elif field == "beparams":
3361
          val = i_be
3362
        elif (field.startswith(BEPREFIX) and
3363
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3364
          val = i_be.get(field[len(BEPREFIX):], None)
3365
        elif st_match and st_match.groups():
3366
          # matches a variable list
3367
          st_groups = st_match.groups()
3368
          if st_groups and st_groups[0] == "disk":
3369
            if st_groups[1] == "count":
3370
              val = len(instance.disks)
3371
            elif st_groups[1] == "sizes":
3372
              val = [disk.size for disk in instance.disks]
3373
            elif st_groups[1] == "size":
3374
              try:
3375
                val = instance.FindDisk(st_groups[2]).size
3376
              except errors.OpPrereqError:
3377
                val = None
3378
            else:
3379
              assert False, "Unhandled disk parameter"
3380
          elif st_groups[0] == "nic":
3381
            if st_groups[1] == "count":
3382
              val = len(instance.nics)
3383
            elif st_groups[1] == "macs":
3384
              val = [nic.mac for nic in instance.nics]
3385
            elif st_groups[1] == "ips":
3386
              val = [nic.ip for nic in instance.nics]
3387
            elif st_groups[1] == "bridges":
3388
              val = [nic.bridge for nic in instance.nics]
3389
            else:
3390
              # index-based item
3391
              nic_idx = int(st_groups[2])
3392
              if nic_idx >= len(instance.nics):
3393
                val = None
3394
              else:
3395
                if st_groups[1] == "mac":
3396
                  val = instance.nics[nic_idx].mac
3397
                elif st_groups[1] == "ip":
3398
                  val = instance.nics[nic_idx].ip
3399
                elif st_groups[1] == "bridge":
3400
                  val = instance.nics[nic_idx].bridge
3401
                else:
3402
                  assert False, "Unhandled NIC parameter"
3403
          else:
3404
            assert False, "Unhandled variable parameter"
3405
        else:
3406
          raise errors.ParameterError(field)
3407
        iout.append(val)
3408
      output.append(iout)
3409

    
3410
    return output
3411

    
3412

    
3413
class LUFailoverInstance(LogicalUnit):
3414
  """Failover an instance.
3415

3416
  """
3417
  HPATH = "instance-failover"
3418
  HTYPE = constants.HTYPE_INSTANCE
3419
  _OP_REQP = ["instance_name", "ignore_consistency"]
3420
  REQ_BGL = False
3421

    
3422
  def ExpandNames(self):
3423
    self._ExpandAndLockInstance()
3424
    self.needed_locks[locking.LEVEL_NODE] = []
3425
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3426

    
3427
  def DeclareLocks(self, level):
3428
    if level == locking.LEVEL_NODE:
3429
      self._LockInstancesNodes()
3430

    
3431
  def BuildHooksEnv(self):
3432
    """Build hooks env.
3433

3434
    This runs on master, primary and secondary nodes of the instance.
3435

3436
    """
3437
    env = {
3438
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3439
      }
3440
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3441
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3442
    return env, nl, nl
3443

    
3444
  def CheckPrereq(self):
3445
    """Check prerequisites.
3446

3447
    This checks that the instance is in the cluster.
3448

3449
    """
3450
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3451
    assert self.instance is not None, \
3452
      "Cannot retrieve locked instance %s" % self.op.instance_name
3453

    
3454
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3455
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3456
      raise errors.OpPrereqError("Instance's disk layout is not"
3457
                                 " network mirrored, cannot failover.")
3458

    
3459
    secondary_nodes = instance.secondary_nodes
3460
    if not secondary_nodes:
3461
      raise errors.ProgrammerError("no secondary node but using "
3462
                                   "a mirrored disk template")
3463

    
3464
    target_node = secondary_nodes[0]
3465
    _CheckNodeOnline(self, target_node)
3466
    _CheckNodeNotDrained(self, target_node)
3467
    # check memory requirements on the secondary node
3468
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3469
                         instance.name, bep[constants.BE_MEMORY],
3470
                         instance.hypervisor)
3471

    
3472
    # check bridge existance
3473
    brlist = [nic.bridge for nic in instance.nics]
3474
    result = self.rpc.call_bridges_exist(target_node, brlist)
3475
    result.Raise()
3476
    if not result.data:
3477
      raise errors.OpPrereqError("One or more target bridges %s does not"
3478
                                 " exist on destination node '%s'" %
3479
                                 (brlist, target_node))
3480

    
3481
  def Exec(self, feedback_fn):
3482
    """Failover an instance.
3483

3484
    The failover is done by shutting it down on its present node and
3485
    starting it on the secondary.
3486

3487
    """
3488
    instance = self.instance
3489

    
3490
    source_node = instance.primary_node
3491
    target_node = instance.secondary_nodes[0]
3492

    
3493
    feedback_fn("* checking disk consistency between source and target")
3494
    for dev in instance.disks:
3495
      # for drbd, these are drbd over lvm
3496
      if not _CheckDiskConsistency(self, dev, target_node, False):
3497
        if instance.admin_up and not self.op.ignore_consistency:
3498
          raise errors.OpExecError("Disk %s is degraded on target node,"
3499
                                   " aborting failover." % dev.iv_name)
3500

    
3501
    feedback_fn("* shutting down instance on source node")
3502
    logging.info("Shutting down instance %s on node %s",
3503
                 instance.name, source_node)
3504

    
3505
    result = self.rpc.call_instance_shutdown(source_node, instance)
3506
    msg = result.RemoteFailMsg()
3507
    if msg:
3508
      if self.op.ignore_consistency:
3509
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3510
                             " Proceeding anyway. Please make sure node"
3511
                             " %s is down. Error details: %s",
3512
                             instance.name, source_node, source_node, msg)
3513
      else:
3514
        raise errors.OpExecError("Could not shutdown instance %s on"
3515
                                 " node %s: %s" %
3516
                                 (instance.name, source_node, msg))
3517

    
3518
    feedback_fn("* deactivating the instance's disks on source node")
3519
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3520
      raise errors.OpExecError("Can't shut down the instance's disks.")
3521

    
3522
    instance.primary_node = target_node
3523
    # distribute new instance config to the other nodes
3524
    self.cfg.Update(instance)
3525

    
3526
    # Only start the instance if it's marked as up
3527
    if instance.admin_up:
3528
      feedback_fn("* activating the instance's disks on target node")
3529
      logging.info("Starting instance %s on node %s",
3530
                   instance.name, target_node)
3531

    
3532
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3533
                                               ignore_secondaries=True)
3534
      if not disks_ok:
3535
        _ShutdownInstanceDisks(self, instance)
3536
        raise errors.OpExecError("Can't activate the instance's disks")
3537

    
3538
      feedback_fn("* starting the instance on the target node")
3539
      result = self.rpc.call_instance_start(target_node, instance)
3540
      msg = result.RemoteFailMsg()
3541
      if msg:
3542
        _ShutdownInstanceDisks(self, instance)
3543
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3544
                                 (instance.name, target_node, msg))
3545

    
3546

    
3547
class LUMigrateInstance(LogicalUnit):
3548
  """Migrate an instance.
3549

3550
  This is migration without shutting down, compared to the failover,
3551
  which is done with shutdown.
3552

3553
  """
3554
  HPATH = "instance-migrate"
3555
  HTYPE = constants.HTYPE_INSTANCE
3556
  _OP_REQP = ["instance_name", "live", "cleanup"]
3557

    
3558
  REQ_BGL = False
3559

    
3560
  def ExpandNames(self):
3561
    self._ExpandAndLockInstance()
3562
    self.needed_locks[locking.LEVEL_NODE] = []
3563
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3564

    
3565
  def DeclareLocks(self, level):
3566
    if level == locking.LEVEL_NODE:
3567
      self._LockInstancesNodes()
3568

    
3569
  def BuildHooksEnv(self):
3570
    """Build hooks env.
3571

3572
    This runs on master, primary and secondary nodes of the instance.
3573

3574
    """
3575
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3576
    env["MIGRATE_LIVE"] = self.op.live
3577
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3578
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3579
    return env, nl, nl
3580

    
3581
  def CheckPrereq(self):
3582
    """Check prerequisites.
3583

3584
    This checks that the instance is in the cluster.
3585

3586
    """
3587
    instance = self.cfg.GetInstanceInfo(
3588
      self.cfg.ExpandInstanceName(self.op.instance_name))
3589
    if instance is None:
3590
      raise errors.OpPrereqError("Instance '%s' not known" %
3591
                                 self.op.instance_name)
3592

    
3593
    if instance.disk_template != constants.DT_DRBD8:
3594
      raise errors.OpPrereqError("Instance's disk layout is not"
3595
                                 " drbd8, cannot migrate.")
3596

    
3597
    secondary_nodes = instance.secondary_nodes
3598
    if not secondary_nodes:
3599
      raise errors.ConfigurationError("No secondary node but using"
3600
                                      " drbd8 disk template")
3601

    
3602
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3603

    
3604
    target_node = secondary_nodes[0]
3605
    # check memory requirements on the secondary node
3606
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3607
                         instance.name, i_be[constants.BE_MEMORY],
3608
                         instance.hypervisor)
3609

    
3610
    # check bridge existance
3611
    brlist = [nic.bridge for nic in instance.nics]
3612
    result = self.rpc.call_bridges_exist(target_node, brlist)
3613
    if result.failed or not result.data:
3614
      raise errors.OpPrereqError("One or more target bridges %s does not"
3615
                                 " exist on destination node '%s'" %
3616
                                 (brlist, target_node))
3617

    
3618
    if not self.op.cleanup:
3619
      _CheckNodeNotDrained(self, target_node)
3620
      result = self.rpc.call_instance_migratable(instance.primary_node,
3621
                                                 instance)
3622
      msg = result.RemoteFailMsg()
3623
      if msg:
3624
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3625
                                   msg)
3626

    
3627
    self.instance = instance
3628

    
3629
  def _WaitUntilSync(self):
3630
    """Poll with custom rpc for disk sync.
3631

3632
    This uses our own step-based rpc call.
3633

3634
    """
3635
    self.feedback_fn("* wait until resync is done")
3636
    all_done = False
3637
    while not all_done:
3638
      all_done = True
3639
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3640
                                            self.nodes_ip,
3641
                                            self.instance.disks)
3642
      min_percent = 100
3643
      for node, nres in result.items():
3644
        msg = nres.RemoteFailMsg()
3645
        if msg:
3646
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3647
                                   (node, msg))
3648
        node_done, node_percent = nres.payload
3649
        all_done = all_done and node_done
3650
        if node_percent is not None:
3651
          min_percent = min(min_percent, node_percent)
3652
      if not all_done:
3653
        if min_percent < 100:
3654
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3655
        time.sleep(2)
3656

    
3657
  def _EnsureSecondary(self, node):
3658
    """Demote a node to secondary.
3659

3660
    """
3661
    self.feedback_fn("* switching node %s to secondary mode" % node)
3662

    
3663
    for dev in self.instance.disks:
3664
      self.cfg.SetDiskID(dev, node)
3665

    
3666
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3667
                                          self.instance.disks)
3668
    msg = result.RemoteFailMsg()
3669
    if msg:
3670
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3671
                               " error %s" % (node, msg))
3672

    
3673
  def _GoStandalone(self):
3674
    """Disconnect from the network.
3675

3676
    """
3677
    self.feedback_fn("* changing into standalone mode")
3678
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3679
                                               self.instance.disks)
3680
    for node, nres in result.items():
3681
      msg = nres.RemoteFailMsg()
3682
      if msg:
3683
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3684
                                 " error %s" % (node, msg))
3685

    
3686
  def _GoReconnect(self, multimaster):
3687
    """Reconnect to the network.
3688

3689
    """
3690
    if multimaster:
3691
      msg = "dual-master"
3692
    else:
3693
      msg = "single-master"
3694
    self.feedback_fn("* changing disks into %s mode" % msg)
3695
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3696
                                           self.instance.disks,
3697
                                           self.instance.name, multimaster)
3698
    for node, nres in result.items():
3699
      msg = nres.RemoteFailMsg()
3700
      if msg:
3701
        raise errors.OpExecError("Cannot change disks config on node %s,"
3702
                                 " error: %s" % (node, msg))
3703

    
3704
  def _ExecCleanup(self):
3705
    """Try to cleanup after a failed migration.
3706

3707
    The cleanup is done by:
3708
      - check that the instance is running only on one node
3709
        (and update the config if needed)
3710
      - change disks on its secondary node to secondary
3711
      - wait until disks are fully synchronized
3712
      - disconnect from the network
3713
      - change disks into single-master mode
3714
      - wait again until disks are fully synchronized
3715

3716
    """
3717
    instance = self.instance
3718
    target_node = self.target_node
3719
    source_node = self.source_node
3720

    
3721
    # check running on only one node
3722
    self.feedback_fn("* checking where the instance actually runs"
3723
                     " (if this hangs, the hypervisor might be in"
3724
                     " a bad state)")
3725
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3726
    for node, result in ins_l.items():
3727
      result.Raise()
3728
      if not isinstance(result.data, list):
3729
        raise errors.OpExecError("Can't contact node '%s'" % node)
3730

    
3731
    runningon_source = instance.name in ins_l[source_node].data
3732
    runningon_target = instance.name in ins_l[target_node].data
3733

    
3734
    if runningon_source and runningon_target:
3735
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3736
                               " or the hypervisor is confused. You will have"
3737
                               " to ensure manually that it runs only on one"
3738
                               " and restart this operation.")
3739

    
3740
    if not (runningon_source or runningon_target):
3741
      raise errors.OpExecError("Instance does not seem to be running at all."
3742
                               " In this case, it's safer to repair by"
3743
                               " running 'gnt-instance stop' to ensure disk"
3744
                               " shutdown, and then restarting it.")
3745

    
3746
    if runningon_target:
3747
      # the migration has actually succeeded, we need to update the config
3748
      self.feedback_fn("* instance running on secondary node (%s),"
3749
                       " updating config" % target_node)
3750
      instance.primary_node = target_node
3751
      self.cfg.Update(instance)
3752
      demoted_node = source_node
3753
    else:
3754
      self.feedback_fn("* instance confirmed to be running on its"
3755
                       " primary node (%s)" % source_node)
3756
      demoted_node = target_node
3757

    
3758
    self._EnsureSecondary(demoted_node)
3759
    try:
3760
      self._WaitUntilSync()
3761
    except errors.OpExecError:
3762
      # we ignore here errors, since if the device is standalone, it
3763
      # won't be able to sync
3764
      pass
3765
    self._GoStandalone()
3766
    self._GoReconnect(False)
3767
    self._WaitUntilSync()
3768

    
3769
    self.feedback_fn("* done")
3770

    
3771
  def _RevertDiskStatus(self):
3772
    """Try to revert the disk status after a failed migration.
3773

3774
    """
3775
    target_node = self.target_node
3776
    try:
3777
      self._EnsureSecondary(target_node)
3778
      self._GoStandalone()
3779
      self._GoReconnect(False)
3780
      self._WaitUntilSync()
3781
    except errors.OpExecError, err:
3782
      self.LogWarning("Migration failed and I can't reconnect the"
3783
                      " drives: error '%s'\n"
3784
                      "Please look and recover the instance status" %
3785
                      str(err))
3786

    
3787
  def _AbortMigration(self):
3788
    """Call the hypervisor code to abort a started migration.
3789

3790
    """
3791
    instance = self.instance
3792
    target_node = self.target_node
3793
    migration_info = self.migration_info
3794

    
3795
    abort_result = self.rpc.call_finalize_migration(target_node,
3796
                                                    instance,
3797
                                                    migration_info,
3798
                                                    False)
3799
    abort_msg = abort_result.RemoteFailMsg()
3800
    if abort_msg:
3801
      logging.error("Aborting migration failed on target node %s: %s" %
3802
                    (target_node, abort_msg))
3803
      # Don't raise an exception here, as we stil have to try to revert the
3804
      # disk status, even if this step failed.
3805

    
3806
  def _ExecMigration(self):
3807
    """Migrate an instance.
3808

3809
    The migrate is done by:
3810
      - change the disks into dual-master mode
3811
      - wait until disks are fully synchronized again
3812
      - migrate the instance
3813
      - change disks on the new secondary node (the old primary) to secondary
3814
      - wait until disks are fully synchronized
3815
      - change disks into single-master mode
3816

3817
    """
3818
    instance = self.instance
3819
    target_node = self.target_node
3820
    source_node = self.source_node
3821

    
3822
    self.feedback_fn("* checking disk consistency between source and target")
3823
    for dev in instance.disks:
3824
      if not _CheckDiskConsistency(self, dev, target_node, False):
3825
        raise errors.OpExecError("Disk %s is degraded or not fully"
3826
                                 " synchronized on target node,"
3827
                                 " aborting migrate." % dev.iv_name)
3828

    
3829
    # First get the migration information from the remote node
3830
    result = self.rpc.call_migration_info(source_node, instance)
3831
    msg = result.RemoteFailMsg()
3832
    if msg:
3833
      log_err = ("Failed fetching source migration information from %s: %s" %
3834
                 (source_node, msg))
3835
      logging.error(log_err)
3836
      raise errors.OpExecError(log_err)
3837

    
3838
    self.migration_info = migration_info = result.payload
3839

    
3840
    # Then switch the disks to master/master mode
3841
    self._EnsureSecondary(target_node)
3842
    self._GoStandalone()
3843
    self._GoReconnect(True)
3844
    self._WaitUntilSync()
3845

    
3846
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3847
    result = self.rpc.call_accept_instance(target_node,
3848
                                           instance,
3849
                                           migration_info,
3850
                                           self.nodes_ip[target_node])
3851

    
3852
    msg = result.RemoteFailMsg()
3853
    if msg:
3854
      logging.error("Instance pre-migration failed, trying to revert"
3855
                    " disk status: %s", msg)
3856
      self._AbortMigration()
3857
      self._RevertDiskStatus()
3858
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3859
                               (instance.name, msg))
3860

    
3861
    self.feedback_fn("* migrating instance to %s" % target_node)
3862
    time.sleep(10)
3863
    result = self.rpc.call_instance_migrate(source_node, instance,
3864
                                            self.nodes_ip[target_node],
3865
                                            self.op.live)
3866
    msg = result.RemoteFailMsg()
3867
    if msg:
3868
      logging.error("Instance migration failed, trying to revert"
3869
                    " disk status: %s", msg)
3870
      self._AbortMigration()
3871
      self._RevertDiskStatus()
3872
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3873
                               (instance.name, msg))
3874
    time.sleep(10)
3875

    
3876
    instance.primary_node = target_node
3877
    # distribute new instance config to the other nodes
3878
    self.cfg.Update(instance)
3879

    
3880
    result = self.rpc.call_finalize_migration(target_node,
3881
                                              instance,
3882
                                              migration_info,
3883
                                              True)
3884
    msg = result.RemoteFailMsg()
3885
    if msg:
3886
      logging.error("Instance migration succeeded, but finalization failed:"
3887
                    " %s" % msg)
3888
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3889
                               msg)
3890

    
3891
    self._EnsureSecondary(source_node)
3892
    self._WaitUntilSync()
3893
    self._GoStandalone()
3894
    self._GoReconnect(False)
3895
    self._WaitUntilSync()
3896

    
3897
    self.feedback_fn("* done")
3898

    
3899
  def Exec(self, feedback_fn):
3900
    """Perform the migration.
3901

3902
    """
3903
    self.feedback_fn = feedback_fn
3904

    
3905
    self.source_node = self.instance.primary_node
3906
    self.target_node = self.instance.secondary_nodes[0]
3907
    self.all_nodes = [self.source_node, self.target_node]
3908
    self.nodes_ip = {
3909
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3910
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3911
      }
3912
    if self.op.cleanup:
3913
      return self._ExecCleanup()
3914
    else:
3915
      return self._ExecMigration()
3916

    
3917

    
3918
def _CreateBlockDev(lu, node, instance, device, force_create,
3919
                    info, force_open):
3920
  """Create a tree of block devices on a given node.
3921

3922
  If this device type has to be created on secondaries, create it and
3923
  all its children.
3924

3925
  If not, just recurse to children keeping the same 'force' value.
3926

3927
  @param lu: the lu on whose behalf we execute
3928
  @param node: the node on which to create the device
3929
  @type instance: L{objects.Instance}
3930
  @param instance: the instance which owns the device
3931
  @type device: L{objects.Disk}
3932
  @param device: the device to create
3933
  @type force_create: boolean
3934
  @param force_create: whether to force creation of this device; this
3935
      will be change to True whenever we find a device which has
3936
      CreateOnSecondary() attribute
3937
  @param info: the extra 'metadata' we should attach to the device
3938
      (this will be represented as a LVM tag)
3939
  @type force_open: boolean
3940
  @param force_open: this parameter will be passes to the
3941
      L{backend.BlockdevCreate} function where it specifies
3942
      whether we run on primary or not, and it affects both
3943
      the child assembly and the device own Open() execution
3944

3945
  """
3946
  if device.CreateOnSecondary():
3947
    force_create = True
3948

    
3949
  if device.children:
3950
    for child in device.children:
3951
      _CreateBlockDev(lu, node, instance, child, force_create,
3952
                      info, force_open)
3953

    
3954
  if not force_create:
3955
    return
3956

    
3957
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3958

    
3959

    
3960
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3961
  """Create a single block device on a given node.
3962

3963
  This will not recurse over children of the device, so they must be
3964
  created in advance.
3965

3966
  @param lu: the lu on whose behalf we execute
3967
  @param node: the node on which to create the device
3968
  @type instance: L{objects.Instance}
3969
  @param instance: the instance which owns the device
3970
  @type device: L{objects.Disk}
3971
  @param device: the device to create
3972
  @param info: the extra 'metadata' we should attach to the device
3973
      (this will be represented as a LVM tag)
3974
  @type force_open: boolean
3975
  @param force_open: this parameter will be passes to the
3976
      L{backend.BlockdevCreate} function where it specifies
3977
      whether we run on primary or not, and it affects both
3978
      the child assembly and the device own Open() execution
3979

3980
  """
3981
  lu.cfg.SetDiskID(device, node)
3982
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3983
                                       instance.name, force_open, info)
3984
  msg = result.RemoteFailMsg()
3985
  if msg:
3986
    raise errors.OpExecError("Can't create block device %s on"
3987
                             " node %s for instance %s: %s" %
3988
                             (device, node, instance.name, msg))
3989
  if device.physical_id is None:
3990
    device.physical_id = result.payload
3991

    
3992

    
3993
def _GenerateUniqueNames(lu, exts):
3994
  """Generate a suitable LV name.
3995

3996
  This will generate a logical volume name for the given instance.
3997

3998
  """
3999
  results = []
4000
  for val in exts:
4001
    new_id = lu.cfg.GenerateUniqueID()
4002
    results.append("%s%s" % (new_id, val))
4003
  return results
4004

    
4005

    
4006
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4007
                         p_minor, s_minor):
4008
  """Generate a drbd8 device complete with its children.
4009

4010
  """
4011
  port = lu.cfg.AllocatePort()
4012
  vgname = lu.cfg.GetVGName()
4013
  shared_secret = lu.cfg.GenerateDRBDSecret()
4014
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4015
                          logical_id=(vgname, names[0]))
4016
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4017
                          logical_id=(vgname, names[1]))
4018
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4019
                          logical_id=(primary, secondary, port,
4020
                                      p_minor, s_minor,
4021
                                      shared_secret),
4022
                          children=[dev_data, dev_meta],
4023
                          iv_name=iv_name)
4024
  return drbd_dev
4025

    
4026

    
4027
def _GenerateDiskTemplate(lu, template_name,
4028
                          instance_name, primary_node,
4029
                          secondary_nodes, disk_info,
4030
                          file_storage_dir, file_driver,
4031
                          base_index):
4032
  """Generate the entire disk layout for a given template type.
4033

4034
  """
4035
  #TODO: compute space requirements
4036

    
4037
  vgname = lu.cfg.GetVGName()
4038
  disk_count = len(disk_info)
4039
  disks = []
4040
  if template_name == constants.DT_DISKLESS:
4041
    pass
4042
  elif template_name == constants.DT_PLAIN:
4043
    if len(secondary_nodes) != 0:
4044
      raise errors.ProgrammerError("Wrong template configuration")
4045

    
4046
    names = _GenerateUniqueNames(lu, [".disk%d" % i
4047
                                      for i in range(disk_count)])
4048
    for idx, disk in enumerate(disk_info):
4049
      disk_index = idx + base_index
4050
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4051
                              logical_id=(vgname, names[idx]),
4052
                              iv_name="disk/%d" % disk_index,
4053
                              mode=disk["mode"])
4054
      disks.append(disk_dev)
4055
  elif template_name == constants.DT_DRBD8:
4056
    if len(secondary_nodes) != 1:
4057
      raise errors.ProgrammerError("Wrong template configuration")
4058
    remote_node = secondary_nodes[0]
4059
    minors = lu.cfg.AllocateDRBDMinor(
4060
      [primary_node, remote_node] * len(disk_info), instance_name)
4061

    
4062
    names = []
4063
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4064
                                               for i in range(disk_count)]):
4065
      names.append(lv_prefix + "_data")
4066
      names.append(lv_prefix + "_meta")
4067
    for idx, disk in enumerate(disk_info):
4068
      disk_index = idx + base_index
4069
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4070
                                      disk["size"], names[idx*2:idx*2+2],
4071
                                      "disk/%d" % disk_index,
4072
                                      minors[idx*2], minors[idx*2+1])
4073
      disk_dev.mode = disk["mode"]
4074
      disks.append(disk_dev)
4075
  elif template_name == constants.DT_FILE:
4076
    if len(secondary_nodes) != 0:
4077
      raise errors.ProgrammerError("Wrong template configuration")
4078

    
4079
    for idx, disk in enumerate(disk_info):
4080
      disk_index = idx + base_index
4081
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4082
                              iv_name="disk/%d" % disk_index,
4083
                              logical_id=(file_driver,
4084
                                          "%s/disk%d" % (file_storage_dir,
4085
                                                         disk_index)),
4086
                              mode=disk["mode"])
4087
      disks.append(disk_dev)
4088
  else:
4089
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4090
  return disks
4091

    
4092

    
4093
def _GetInstanceInfoText(instance):
4094
  """Compute that text that should be added to the disk's metadata.
4095

4096
  """
4097
  return "originstname+%s" % instance.name
4098

    
4099

    
4100
def _CreateDisks(lu, instance):
4101
  """Create all disks for an instance.
4102

4103
  This abstracts away some work from AddInstance.
4104

4105
  @type lu: L{LogicalUnit}
4106
  @param lu: the logical unit on whose behalf we execute
4107
  @type instance: L{objects.Instance}
4108
  @param instance: the instance whose disks we should create
4109
  @rtype: boolean
4110
  @return: the success of the creation
4111

4112
  """
4113
  info = _GetInstanceInfoText(instance)
4114
  pnode = instance.primary_node
4115

    
4116
  if instance.disk_template == constants.DT_FILE:
4117
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4118
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4119

    
4120
    if result.failed or not result.data:
4121
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4122

    
4123
    if not result.data[0]:
4124
      raise errors.OpExecError("Failed to create directory '%s'" %
4125
                               file_storage_dir)
4126

    
4127
  # Note: this needs to be kept in sync with adding of disks in
4128
  # LUSetInstanceParams
4129
  for device in instance.disks:
4130
    logging.info("Creating volume %s for instance %s",
4131
                 device.iv_name, instance.name)
4132
    #HARDCODE
4133
    for node in instance.all_nodes:
4134
      f_create = node == pnode
4135
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4136

    
4137

    
4138
def _RemoveDisks(lu, instance):
4139
  """Remove all disks for an instance.
4140

4141
  This abstracts away some work from `AddInstance()` and
4142
  `RemoveInstance()`. Note that in case some of the devices couldn't
4143
  be removed, the removal will continue with the other ones (compare
4144
  with `_CreateDisks()`).
4145

4146
  @type lu: L{LogicalUnit}
4147
  @param lu: the logical unit on whose behalf we execute
4148
  @type instance: L{objects.Instance}
4149
  @param instance: the instance whose disks we should remove
4150
  @rtype: boolean
4151
  @return: the success of the removal
4152

4153
  """
4154
  logging.info("Removing block devices for instance %s", instance.name)
4155

    
4156
  all_result = True
4157
  for device in instance.disks:
4158
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4159
      lu.cfg.SetDiskID(disk, node)
4160
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4161
      if msg:
4162
        lu.LogWarning("Could not remove block device %s on node %s,"
4163
                      " continuing anyway: %s", device.iv_name, node, msg)
4164
        all_result = False
4165

    
4166
  if instance.disk_template == constants.DT_FILE:
4167
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4168
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4169
                                                 file_storage_dir)
4170
    if result.failed or not result.data:
4171
      logging.error("Could not remove directory '%s'", file_storage_dir)
4172
      all_result = False
4173

    
4174
  return all_result
4175

    
4176

    
4177
def _ComputeDiskSize(disk_template, disks):
4178
  """Compute disk size requirements in the volume group
4179

4180
  """
4181
  # Required free disk space as a function of disk and swap space
4182
  req_size_dict = {
4183
    constants.DT_DISKLESS: None,
4184
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4185
    # 128 MB are added for drbd metadata for each disk
4186
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4187
    constants.DT_FILE: None,
4188
  }
4189

    
4190
  if disk_template not in req_size_dict:
4191
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4192
                                 " is unknown" %  disk_template)
4193

    
4194
  return req_size_dict[disk_template]
4195

    
4196

    
4197
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4198
  """Hypervisor parameter validation.
4199

4200
  This function abstract the hypervisor parameter validation to be
4201
  used in both instance create and instance modify.
4202

4203
  @type lu: L{LogicalUnit}
4204
  @param lu: the logical unit for which we check
4205
  @type nodenames: list
4206
  @param nodenames: the list of nodes on which we should check
4207
  @type hvname: string
4208
  @param hvname: the name of the hypervisor we should use
4209
  @type hvparams: dict
4210
  @param hvparams: the parameters which we need to check
4211
  @raise errors.OpPrereqError: if the parameters are not valid
4212

4213
  """
4214
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4215
                                                  hvname,
4216
                                                  hvparams)
4217
  for node in nodenames:
4218
    info = hvinfo[node]
4219
    if info.offline:
4220
      continue
4221
    msg = info.RemoteFailMsg()
4222
    if msg:
4223
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4224
                                 " %s" % msg)
4225

    
4226

    
4227
class LUCreateInstance(LogicalUnit):
4228
  """Create an instance.
4229

4230
  """
4231
  HPATH = "instance-add"
4232
  HTYPE = constants.HTYPE_INSTANCE
4233
  _OP_REQP = ["instance_name", "disks", "disk_template",
4234
              "mode", "start",
4235
              "wait_for_sync", "ip_check", "nics",
4236
              "hvparams", "beparams"]
4237
  REQ_BGL = False
4238

    
4239
  def _ExpandNode(self, node):
4240
    """Expands and checks one node name.
4241

4242
    """
4243
    node_full = self.cfg.ExpandNodeName(node)
4244
    if node_full is None:
4245
      raise errors.OpPrereqError("Unknown node %s" % node)
4246
    return node_full
4247

    
4248
  def ExpandNames(self):
4249
    """ExpandNames for CreateInstance.
4250

4251
    Figure out the right locks for instance creation.
4252

4253
    """
4254
    self.needed_locks = {}
4255

    
4256
    # set optional parameters to none if they don't exist
4257
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4258
      if not hasattr(self.op, attr):
4259
        setattr(self.op, attr, None)
4260

    
4261
    # cheap checks, mostly valid constants given
4262

    
4263
    # verify creation mode
4264
    if self.op.mode not in (constants.INSTANCE_CREATE,
4265
                            constants.INSTANCE_IMPORT):
4266
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4267
                                 self.op.mode)
4268

    
4269
    # disk template and mirror node verification
4270
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4271
      raise errors.OpPrereqError("Invalid disk template name")
4272

    
4273
    if self.op.hypervisor is None:
4274
      self.op.hypervisor = self.cfg.GetHypervisorType()
4275

    
4276
    cluster = self.cfg.GetClusterInfo()
4277
    enabled_hvs = cluster.enabled_hypervisors
4278
    if self.op.hypervisor not in enabled_hvs:
4279
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4280
                                 " cluster (%s)" % (self.op.hypervisor,
4281
                                  ",".join(enabled_hvs)))
4282

    
4283
    # check hypervisor parameter syntax (locally)
4284
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4285
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4286
                                  self.op.hvparams)
4287
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4288
    hv_type.CheckParameterSyntax(filled_hvp)
4289

    
4290
    # fill and remember the beparams dict
4291
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4292
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4293
                                    self.op.beparams)
4294

    
4295
    #### instance parameters check
4296

    
4297
    # instance name verification
4298
    hostname1 = utils.HostInfo(self.op.instance_name)
4299
    self.op.instance_name = instance_name = hostname1.name
4300

    
4301
    # this is just a preventive check, but someone might still add this
4302
    # instance in the meantime, and creation will fail at lock-add time
4303
    if instance_name in self.cfg.GetInstanceList():
4304
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4305
                                 instance_name)
4306

    
4307
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4308

    
4309
    # NIC buildup
4310
    self.nics = []
4311
    for nic in self.op.nics:
4312
      # ip validity checks
4313
      ip = nic.get("ip", None)
4314
      if ip is None or ip.lower() == "none":
4315
        nic_ip = None
4316
      elif ip.lower() == constants.VALUE_AUTO:
4317
        nic_ip = hostname1.ip
4318
      else:
4319
        if not utils.IsValidIP(ip):
4320
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4321
                                     " like a valid IP" % ip)
4322
        nic_ip = ip
4323

    
4324
      # MAC address verification
4325
      mac = nic.get("mac", constants.VALUE_AUTO)
4326
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4327
        if not utils.IsValidMac(mac.lower()):
4328
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4329
                                     mac)
4330
      # bridge verification
4331
      bridge = nic.get("bridge", None)
4332
      if bridge is None:
4333
        bridge = self.cfg.GetDefBridge()
4334
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4335

    
4336
    # disk checks/pre-build
4337
    self.disks = []
4338
    for disk in self.op.disks:
4339
      mode = disk.get("mode", constants.DISK_RDWR)
4340
      if mode not in constants.DISK_ACCESS_SET:
4341
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4342
                                   mode)
4343
      size = disk.get("size", None)
4344
      if size is None:
4345
        raise errors.OpPrereqError("Missing disk size")
4346
      try:
4347
        size = int(size)
4348
      except ValueError:
4349
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4350
      self.disks.append({"size": size, "mode": mode})
4351

    
4352
    # used in CheckPrereq for ip ping check
4353
    self.check_ip = hostname1.ip
4354

    
4355
    # file storage checks
4356
    if (self.op.file_driver and
4357
        not self.op.file_driver in constants.FILE_DRIVER):
4358
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4359
                                 self.op.file_driver)
4360

    
4361
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4362
      raise errors.OpPrereqError("File storage directory path not absolute")
4363

    
4364
    ### Node/iallocator related checks
4365
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4366
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4367
                                 " node must be given")
4368

    
4369
    if self.op.iallocator:
4370
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4371
    else:
4372
      self.op.pnode = self._ExpandNode(self.op.pnode)
4373
      nodelist = [self.op.pnode]
4374
      if self.op.snode is not None:
4375
        self.op.snode = self._ExpandNode(self.op.snode)
4376
        nodelist.append(self.op.snode)
4377
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4378

    
4379
    # in case of import lock the source node too
4380
    if self.op.mode == constants.INSTANCE_IMPORT:
4381
      src_node = getattr(self.op, "src_node", None)
4382
      src_path = getattr(self.op, "src_path", None)
4383

    
4384
      if src_path is None:
4385
        self.op.src_path = src_path = self.op.instance_name
4386

    
4387
      if src_node is None:
4388
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4389
        self.op.src_node = None
4390
        if os.path.isabs(src_path):
4391
          raise errors.OpPrereqError("Importing an instance from an absolute"
4392
                                     " path requires a source node option.")
4393
      else:
4394
        self.op.src_node = src_node = self._ExpandNode(src_node)
4395
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4396
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4397
        if not os.path.isabs(src_path):
4398
          self.op.src_path = src_path = \
4399
            os.path.join(constants.EXPORT_DIR, src_path)
4400

    
4401
    else: # INSTANCE_CREATE
4402
      if getattr(self.op, "os_type", None) is None:
4403
        raise errors.OpPrereqError("No guest OS specified")
4404

    
4405
  def _RunAllocator(self):
4406
    """Run the allocator based on input opcode.
4407

4408
    """
4409
    nics = [n.ToDict() for n in self.nics]
4410
    ial = IAllocator(self,
4411
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4412
                     name=self.op.instance_name,
4413
                     disk_template=self.op.disk_template,
4414
                     tags=[],
4415
                     os=self.op.os_type,
4416
                     vcpus=self.be_full[constants.BE_VCPUS],
4417
                     mem_size=self.be_full[constants.BE_MEMORY],
4418
                     disks=self.disks,
4419
                     nics=nics,
4420
                     hypervisor=self.op.hypervisor,
4421
                     )
4422

    
4423
    ial.Run(self.op.iallocator)
4424

    
4425
    if not ial.success:
4426
      raise errors.OpPrereqError("Can't compute nodes using"
4427
                                 " iallocator '%s': %s" % (self.op.iallocator,
4428
                                                           ial.info))
4429
    if len(ial.nodes) != ial.required_nodes:
4430
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4431
                                 " of nodes (%s), required %s" %
4432
                                 (self.op.iallocator, len(ial.nodes),
4433
                                  ial.required_nodes))
4434
    self.op.pnode = ial.nodes[0]
4435
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4436
                 self.op.instance_name, self.op.iallocator,
4437
                 ", ".join(ial.nodes))
4438
    if ial.required_nodes == 2:
4439
      self.op.snode = ial.nodes[1]
4440

    
4441
  def BuildHooksEnv(self):
4442
    """Build hooks env.
4443

4444
    This runs on master, primary and secondary nodes of the instance.
4445

4446
    """
4447
    env = {
4448
      "ADD_MODE": self.op.mode,
4449
      }
4450
    if self.op.mode == constants.INSTANCE_IMPORT:
4451
      env["SRC_NODE"] = self.op.src_node
4452
      env["SRC_PATH"] = self.op.src_path
4453
      env["SRC_IMAGES"] = self.src_images
4454

    
4455
    env.update(_BuildInstanceHookEnv(
4456
      name=self.op.instance_name,
4457
      primary_node=self.op.pnode,
4458
      secondary_nodes=self.secondaries,
4459
      status=self.op.start,
4460
      os_type=self.op.os_type,
4461
      memory=self.be_full[constants.BE_MEMORY],
4462
      vcpus=self.be_full[constants.BE_VCPUS],
4463
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4464
      disk_template=self.op.disk_template,
4465
      disks=[(d["size"], d["mode"]) for d in self.disks],
4466
    ))
4467

    
4468
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4469
          self.secondaries)
4470
    return env, nl, nl
4471

    
4472

    
4473
  def CheckPrereq(self):
4474
    """Check prerequisites.
4475

4476
    """
4477
    if (not self.cfg.GetVGName() and
4478
        self.op.disk_template not in constants.DTS_NOT_LVM):
4479
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4480
                                 " instances")
4481

    
4482
    if self.op.mode == constants.INSTANCE_IMPORT:
4483
      src_node = self.op.src_node
4484
      src_path = self.op.src_path
4485

    
4486
      if src_node is None:
4487
        exp_list = self.rpc.call_export_list(
4488
          self.acquired_locks[locking.LEVEL_NODE])
4489
        found = False
4490
        for node in exp_list:
4491
          if not exp_list[node].failed and src_path in exp_list[node].data:
4492
            found = True
4493
            self.op.src_node = src_node = node
4494
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4495
                                                       src_path)
4496
            break
4497
        if not found:
4498
          raise errors.OpPrereqError("No export found for relative path %s" %
4499
                                      src_path)
4500

    
4501
      _CheckNodeOnline(self, src_node)
4502
      result = self.rpc.call_export_info(src_node, src_path)
4503
      result.Raise()
4504
      if not result.data:
4505
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4506

    
4507
      export_info = result.data
4508
      if not export_info.has_section(constants.INISECT_EXP):
4509
        raise errors.ProgrammerError("Corrupted export config")
4510

    
4511
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4512
      if (int(ei_version) != constants.EXPORT_VERSION):
4513
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4514
                                   (ei_version, constants.EXPORT_VERSION))
4515

    
4516
      # Check that the new instance doesn't have less disks than the export
4517
      instance_disks = len(self.disks)
4518
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4519
      if instance_disks < export_disks:
4520
        raise errors.OpPrereqError("Not enough disks to import."
4521
                                   " (instance: %d, export: %d)" %
4522
                                   (instance_disks, export_disks))
4523

    
4524
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4525
      disk_images = []
4526
      for idx in range(export_disks):
4527
        option = 'disk%d_dump' % idx
4528
        if export_info.has_option(constants.INISECT_INS, option):
4529
          # FIXME: are the old os-es, disk sizes, etc. useful?
4530
          export_name = export_info.get(constants.INISECT_INS, option)
4531
          image = os.path.join(src_path, export_name)
4532
          disk_images.append(image)
4533
        else:
4534
          disk_images.append(False)
4535

    
4536
      self.src_images = disk_images
4537

    
4538
      old_name = export_info.get(constants.INISECT_INS, 'name')
4539
      # FIXME: int() here could throw a ValueError on broken exports
4540
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4541
      if self.op.instance_name == old_name:
4542
        for idx, nic in enumerate(self.nics):
4543
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4544
            nic_mac_ini = 'nic%d_mac' % idx
4545
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4546

    
4547
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4548
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4549
    if self.op.start and not self.op.ip_check:
4550
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4551
                                 " adding an instance in start mode")
4552

    
4553
    if self.op.ip_check:
4554
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4555
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4556
                                   (self.check_ip, self.op.instance_name))
4557

    
4558
    #### mac address generation
4559
    # By generating here the mac address both the allocator and the hooks get
4560
    # the real final mac address rather than the 'auto' or 'generate' value.
4561
    # There is a race condition between the generation and the instance object
4562
    # creation, which means that we know the mac is valid now, but we're not
4563
    # sure it will be when we actually add the instance. If things go bad
4564
    # adding the instance will abort because of a duplicate mac, and the
4565
    # creation job will fail.
4566
    for nic in self.nics:
4567
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4568
        nic.mac = self.cfg.GenerateMAC()
4569

    
4570
    #### allocator run
4571

    
4572
    if self.op.iallocator is not None:
4573
      self._RunAllocator()
4574

    
4575
    #### node related checks
4576

    
4577
    # check primary node
4578
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4579
    assert self.pnode is not None, \
4580
      "Cannot retrieve locked node %s" % self.op.pnode
4581
    if pnode.offline:
4582
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4583
                                 pnode.name)
4584
    if pnode.drained:
4585
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4586
                                 pnode.name)
4587

    
4588
    self.secondaries = []
4589

    
4590
    # mirror node verification
4591
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4592
      if self.op.snode is None:
4593
        raise errors.OpPrereqError("The networked disk templates need"
4594
                                   " a mirror node")
4595
      if self.op.snode == pnode.name:
4596
        raise errors.OpPrereqError("The secondary node cannot be"
4597
                                   " the primary node.")
4598
      _CheckNodeOnline(self, self.op.snode)
4599
      _CheckNodeNotDrained(self, self.op.snode)
4600
      self.secondaries.append(self.op.snode)
4601

    
4602
    nodenames = [pnode.name] + self.secondaries
4603

    
4604
    req_size = _ComputeDiskSize(self.op.disk_template,
4605
                                self.disks)
4606

    
4607
    # Check lv size requirements
4608
    if req_size is not None:
4609
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4610
                                         self.op.hypervisor)
4611
      for node in nodenames:
4612
        info = nodeinfo[node]
4613
        info.Raise()
4614
        info = info.data
4615
        if not info:
4616
          raise errors.OpPrereqError("Cannot get current information"
4617
                                     " from node '%s'" % node)
4618
        vg_free = info.get('vg_free', None)
4619
        if not isinstance(vg_free, int):
4620
          raise errors.OpPrereqError("Can't compute free disk space on"
4621
                                     " node %s" % node)
4622
        if req_size > info['vg_free']:
4623
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4624
                                     " %d MB available, %d MB required" %
4625
                                     (node, info['vg_free'], req_size))
4626

    
4627
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4628

    
4629
    # os verification
4630
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4631
    result.Raise()
4632
    if not isinstance(result.data, objects.OS):
4633
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4634
                                 " primary node"  % self.op.os_type)
4635

    
4636
    # bridge check on primary node
4637
    bridges = [n.bridge for n in self.nics]
4638
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4639
    result.Raise()
4640
    if not result.data:
4641
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4642
                                 " exist on destination node '%s'" %
4643
                                 (",".join(bridges), pnode.name))
4644

    
4645
    # memory check on primary node
4646
    if self.op.start:
4647
      _CheckNodeFreeMemory(self, self.pnode.name,
4648
                           "creating instance %s" % self.op.instance_name,
4649
                           self.be_full[constants.BE_MEMORY],
4650
                           self.op.hypervisor)
4651

    
4652
  def Exec(self, feedback_fn):
4653
    """Create and add the instance to the cluster.
4654

4655
    """
4656
    instance = self.op.instance_name
4657
    pnode_name = self.pnode.name
4658

    
4659
    ht_kind = self.op.hypervisor
4660
    if ht_kind in constants.HTS_REQ_PORT:
4661
      network_port = self.cfg.AllocatePort()
4662
    else:
4663
      network_port = None
4664

    
4665
    ##if self.op.vnc_bind_address is None:
4666
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4667

    
4668
    # this is needed because os.path.join does not accept None arguments
4669
    if self.op.file_storage_dir is None:
4670
      string_file_storage_dir = ""
4671
    else:
4672
      string_file_storage_dir = self.op.file_storage_dir
4673

    
4674
    # build the full file storage dir path
4675
    file_storage_dir = os.path.normpath(os.path.join(
4676
                                        self.cfg.GetFileStorageDir(),
4677
                                        string_file_storage_dir, instance))
4678

    
4679

    
4680
    disks = _GenerateDiskTemplate(self,
4681
                                  self.op.disk_template,
4682
                                  instance, pnode_name,
4683
                                  self.secondaries,
4684
                                  self.disks,
4685
                                  file_storage_dir,
4686
                                  self.op.file_driver,
4687
                                  0)
4688

    
4689
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4690
                            primary_node=pnode_name,
4691
                            nics=self.nics, disks=disks,
4692
                            disk_template=self.op.disk_template,
4693
                            admin_up=False,
4694
                            network_port=network_port,
4695
                            beparams=self.op.beparams,
4696
                            hvparams=self.op.hvparams,
4697
                            hypervisor=self.op.hypervisor,
4698
                            )
4699

    
4700
    feedback_fn("* creating instance disks...")
4701
    try:
4702
      _CreateDisks(self, iobj)
4703
    except errors.OpExecError:
4704
      self.LogWarning("Device creation failed, reverting...")
4705
      try:
4706
        _RemoveDisks(self, iobj)
4707
      finally:
4708
        self.cfg.ReleaseDRBDMinors(instance)
4709
        raise
4710

    
4711
    feedback_fn("adding instance %s to cluster config" % instance)
4712

    
4713
    self.cfg.AddInstance(iobj)
4714
    # Declare that we don't want to remove the instance lock anymore, as we've
4715
    # added the instance to the config
4716
    del self.remove_locks[locking.LEVEL_INSTANCE]
4717
    # Unlock all the nodes
4718
    if self.op.mode == constants.INSTANCE_IMPORT:
4719
      nodes_keep = [self.op.src_node]
4720
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4721
                       if node != self.op.src_node]
4722
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4723
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4724
    else:
4725
      self.context.glm.release(locking.LEVEL_NODE)
4726
      del self.acquired_locks[locking.LEVEL_NODE]
4727

    
4728
    if self.op.wait_for_sync:
4729
      disk_abort = not _WaitForSync(self, iobj)
4730
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4731
      # make sure the disks are not degraded (still sync-ing is ok)
4732
      time.sleep(15)
4733
      feedback_fn("* checking mirrors status")
4734
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4735
    else:
4736
      disk_abort = False
4737

    
4738
    if disk_abort:
4739
      _RemoveDisks(self, iobj)
4740
      self.cfg.RemoveInstance(iobj.name)
4741
      # Make sure the instance lock gets removed
4742
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4743
      raise errors.OpExecError("There are some degraded disks for"
4744
                               " this instance")
4745

    
4746
    feedback_fn("creating os for instance %s on node %s" %
4747
                (instance, pnode_name))
4748

    
4749
    if iobj.disk_template != constants.DT_DISKLESS:
4750
      if self.op.mode == constants.INSTANCE_CREATE:
4751
        feedback_fn("* running the instance OS create scripts...")
4752
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4753
        msg = result.RemoteFailMsg()
4754
        if msg:
4755
          raise errors.OpExecError("Could not add os for instance %s"
4756
                                   " on node %s: %s" %
4757
                                   (instance, pnode_name, msg))
4758

    
4759
      elif self.op.mode == constants.INSTANCE_IMPORT:
4760
        feedback_fn("* running the instance OS import scripts...")
4761
        src_node = self.op.src_node
4762
        src_images = self.src_images
4763
        cluster_name = self.cfg.GetClusterName()
4764
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4765
                                                         src_node, src_images,
4766
                                                         cluster_name)
4767
        import_result.Raise()
4768
        for idx, result in enumerate(import_result.data):
4769
          if not result:
4770
            self.LogWarning("Could not import the image %s for instance"
4771
                            " %s, disk %d, on node %s" %
4772
                            (src_images[idx], instance, idx, pnode_name))
4773
      else:
4774
        # also checked in the prereq part
4775
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4776
                                     % self.op.mode)
4777

    
4778
    if self.op.start:
4779
      iobj.admin_up = True
4780
      self.cfg.Update(iobj)
4781
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4782
      feedback_fn("* starting instance...")
4783
      result = self.rpc.call_instance_start(pnode_name, iobj)
4784
      msg = result.RemoteFailMsg()
4785
      if msg:
4786
        raise errors.OpExecError("Could not start instance: %s" % msg)
4787

    
4788

    
4789
class LUConnectConsole(NoHooksLU):
4790
  """Connect to an instance's console.
4791

4792
  This is somewhat special in that it returns the command line that
4793
  you need to run on the master node in order to connect to the
4794
  console.
4795

4796
  """
4797
  _OP_REQP = ["instance_name"]
4798
  REQ_BGL = False
4799

    
4800
  def ExpandNames(self):
4801
    self._ExpandAndLockInstance()
4802

    
4803
  def CheckPrereq(self):
4804
    """Check prerequisites.
4805

4806
    This checks that the instance is in the cluster.
4807

4808
    """
4809
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4810
    assert self.instance is not None, \
4811
      "Cannot retrieve locked instance %s" % self.op.instance_name
4812
    _CheckNodeOnline(self, self.instance.primary_node)
4813

    
4814
  def Exec(self, feedback_fn):
4815
    """Connect to the console of an instance
4816

4817
    """
4818
    instance = self.instance
4819
    node = instance.primary_node
4820

    
4821
    node_insts = self.rpc.call_instance_list([node],
4822
                                             [instance.hypervisor])[node]
4823
    node_insts.Raise()
4824

    
4825
    if instance.name not in node_insts.data:
4826
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4827

    
4828
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4829

    
4830
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4831
    cluster = self.cfg.GetClusterInfo()
4832
    # beparams and hvparams are passed separately, to avoid editing the
4833
    # instance and then saving the defaults in the instance itself.
4834
    hvparams = cluster.FillHV(instance)
4835
    beparams = cluster.FillBE(instance)
4836
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4837

    
4838
    # build ssh cmdline
4839
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4840

    
4841

    
4842
class LUReplaceDisks(LogicalUnit):
4843
  """Replace the disks of an instance.
4844

4845
  """
4846
  HPATH = "mirrors-replace"
4847
  HTYPE = constants.HTYPE_INSTANCE
4848
  _OP_REQP = ["instance_name", "mode", "disks"]
4849
  REQ_BGL = False
4850

    
4851
  def CheckArguments(self):
4852
    if not hasattr(self.op, "remote_node"):
4853
      self.op.remote_node = None
4854
    if not hasattr(self.op, "iallocator"):
4855
      self.op.iallocator = None
4856

    
4857
    # check for valid parameter combination
4858
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4859
    if self.op.mode == constants.REPLACE_DISK_CHG:
4860
      if cnt == 2:
4861
        raise errors.OpPrereqError("When changing the secondary either an"
4862
                                   " iallocator script must be used or the"
4863
                                   " new node given")
4864
      elif cnt == 0:
4865
        raise errors.OpPrereqError("Give either the iallocator or the new"
4866
                                   " secondary, not both")
4867
    else: # not replacing the secondary
4868
      if cnt != 2:
4869
        raise errors.OpPrereqError("The iallocator and new node options can"
4870
                                   " be used only when changing the"
4871
                                   " secondary node")
4872

    
4873
  def ExpandNames(self):
4874
    self._ExpandAndLockInstance()
4875

    
4876
    if self.op.iallocator is not None:
4877
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4878
    elif self.op.remote_node is not None:
4879
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4880
      if remote_node is None:
4881
        raise errors.OpPrereqError("Node '%s' not known" %
4882
                                   self.op.remote_node)
4883
      self.op.remote_node = remote_node
4884
      # Warning: do not remove the locking of the new secondary here
4885
      # unless DRBD8.AddChildren is changed to work in parallel;
4886
      # currently it doesn't since parallel invocations of
4887
      # FindUnusedMinor will conflict
4888
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4889
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4890
    else:
4891
      self.needed_locks[locking.LEVEL_NODE] = []
4892
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4893

    
4894
  def DeclareLocks(self, level):
4895
    # If we're not already locking all nodes in the set we have to declare the
4896
    # instance's primary/secondary nodes.
4897
    if (level == locking.LEVEL_NODE and
4898
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4899
      self._LockInstancesNodes()
4900

    
4901
  def _RunAllocator(self):
4902
    """Compute a new secondary node using an IAllocator.
4903

4904
    """
4905
    ial = IAllocator(self,
4906
                     mode=constants.IALLOCATOR_MODE_RELOC,
4907
                     name=self.op.instance_name,
4908
                     relocate_from=[self.sec_node])
4909

    
4910
    ial.Run(self.op.iallocator)
4911

    
4912
    if not ial.success:
4913
      raise errors.OpPrereqError("Can't compute nodes using"
4914
                                 " iallocator '%s': %s" % (self.op.iallocator,
4915
                                                           ial.info))
4916
    if len(ial.nodes) != ial.required_nodes:
4917
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4918
                                 " of nodes (%s), required %s" %
4919
                                 (len(ial.nodes), ial.required_nodes))
4920
    self.op.remote_node = ial.nodes[0]
4921
    self.LogInfo("Selected new secondary for the instance: %s",
4922
                 self.op.remote_node)
4923

    
4924
  def BuildHooksEnv(self):
4925
    """Build hooks env.
4926

4927
    This runs on the master, the primary and all the secondaries.
4928

4929
    """
4930
    env = {
4931
      "MODE": self.op.mode,
4932
      "NEW_SECONDARY": self.op.remote_node,
4933
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4934
      }
4935
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4936
    nl = [
4937
      self.cfg.GetMasterNode(),
4938
      self.instance.primary_node,
4939
      ]
4940
    if self.op.remote_node is not None:
4941
      nl.append(self.op.remote_node)
4942
    return env, nl, nl
4943

    
4944
  def CheckPrereq(self):
4945
    """Check prerequisites.
4946

4947
    This checks that the instance is in the cluster.
4948

4949
    """
4950
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4951
    assert instance is not None, \
4952
      "Cannot retrieve locked instance %s" % self.op.instance_name
4953
    self.instance = instance
4954

    
4955
    if instance.disk_template != constants.DT_DRBD8:
4956
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4957
                                 " instances")
4958

    
4959
    if len(instance.secondary_nodes) != 1:
4960
      raise errors.OpPrereqError("The instance has a strange layout,"
4961
                                 " expected one secondary but found %d" %
4962
                                 len(instance.secondary_nodes))
4963

    
4964
    self.sec_node = instance.secondary_nodes[0]
4965

    
4966
    if self.op.iallocator is not None:
4967
      self._RunAllocator()
4968

    
4969
    remote_node = self.op.remote_node
4970
    if remote_node is not None:
4971
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4972
      assert self.remote_node_info is not None, \
4973
        "Cannot retrieve locked node %s" % remote_node
4974
    else:
4975
      self.remote_node_info = None
4976
    if remote_node == instance.primary_node:
4977
      raise errors.OpPrereqError("The specified node is the primary node of"
4978
                                 " the instance.")
4979
    elif remote_node == self.sec_node:
4980
      raise errors.OpPrereqError("The specified node is already the"
4981
                                 " secondary node of the instance.")
4982

    
4983
    if self.op.mode == constants.REPLACE_DISK_PRI:
4984
      n1 = self.tgt_node = instance.primary_node
4985
      n2 = self.oth_node = self.sec_node
4986
    elif self.op.mode == constants.REPLACE_DISK_SEC:
4987
      n1 = self.tgt_node = self.sec_node
4988
      n2 = self.oth_node = instance.primary_node
4989
    elif self.op.mode == constants.REPLACE_DISK_CHG:
4990
      n1 = self.new_node = remote_node
4991
      n2 = self.oth_node = instance.primary_node
4992
      self.tgt_node = self.sec_node
4993
      _CheckNodeNotDrained(self, remote_node)
4994
    else:
4995
      raise errors.ProgrammerError("Unhandled disk replace mode")
4996

    
4997
    _CheckNodeOnline(self, n1)
4998
    _CheckNodeOnline(self, n2)
4999

    
5000
    if not self.op.disks:
5001
      self.op.disks = range(len(instance.disks))
5002

    
5003
    for disk_idx in self.op.disks:
5004
      instance.FindDisk(disk_idx)
5005

    
5006
  def _ExecD8DiskOnly(self, feedback_fn):
5007
    """Replace a disk on the primary or secondary for dbrd8.
5008

5009
    The algorithm for replace is quite complicated:
5010

5011
      1. for each disk to be replaced:
5012

5013
        1. create new LVs on the target node with unique names
5014
        1. detach old LVs from the drbd device
5015
        1. rename old LVs to name_replaced.<time_t>
5016
        1. rename new LVs to old LVs
5017
        1. attach the new LVs (with the old names now) to the drbd device
5018

5019
      1. wait for sync across all devices
5020

5021
      1. for each modified disk:
5022

5023
        1. remove old LVs (which have the name name_replaces.<time_t>)
5024

5025
    Failures are not very well handled.
5026

5027
    """
5028
    steps_total = 6
5029
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5030
    instance = self.instance
5031
    iv_names = {}
5032
    vgname = self.cfg.GetVGName()
5033
    # start of work
5034
    cfg = self.cfg
5035
    tgt_node = self.tgt_node
5036
    oth_node = self.oth_node
5037

    
5038
    # Step: check device activation
5039
    self.proc.LogStep(1, steps_total, "check device existence")
5040
    info("checking volume groups")
5041
    my_vg = cfg.GetVGName()
5042
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5043
    if not results:
5044
      raise errors.OpExecError("Can't list volume groups on the nodes")
5045
    for node in oth_node, tgt_node:
5046
      res = results[node]
5047
      if res.failed or not res.data or my_vg not in res.data:
5048
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5049
                                 (my_vg, node))
5050
    for idx, dev in enumerate(instance.disks):
5051
      if idx not in self.op.disks:
5052
        continue
5053
      for node in tgt_node, oth_node:
5054
        info("checking disk/%d on %s" % (idx, node))
5055
        cfg.SetDiskID(dev, node)
5056
        result = self.rpc.call_blockdev_find(node, dev)
5057
        msg = result.RemoteFailMsg()
5058
        if not msg and not result.payload:
5059
          msg = "disk not found"
5060
        if msg:
5061
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5062
                                   (idx, node, msg))
5063

    
5064
    # Step: check other node consistency
5065
    self.proc.LogStep(2, steps_total, "check peer consistency")
5066
    for idx, dev in enumerate(instance.disks):
5067
      if idx not in self.op.disks:
5068
        continue
5069
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5070
      if not _CheckDiskConsistency(self, dev, oth_node,
5071
                                   oth_node==instance.primary_node):
5072
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5073
                                 " to replace disks on this node (%s)" %
5074
                                 (oth_node, tgt_node))
5075

    
5076
    # Step: create new storage
5077
    self.proc.LogStep(3, steps_total, "allocate new storage")
5078
    for idx, dev in enumerate(instance.disks):
5079
      if idx not in self.op.disks:
5080
        continue
5081
      size = dev.size
5082
      cfg.SetDiskID(dev, tgt_node)
5083
      lv_names = [".disk%d_%s" % (idx, suf)
5084
                  for suf in ["data", "meta"]]
5085
      names = _GenerateUniqueNames(self, lv_names)
5086
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5087
                             logical_id=(vgname, names[0]))
5088
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5089
                             logical_id=(vgname, names[1]))
5090
      new_lvs = [lv_data, lv_meta]
5091
      old_lvs = dev.children
5092
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5093
      info("creating new local storage on %s for %s" %
5094
           (tgt_node, dev.iv_name))
5095
      # we pass force_create=True to force the LVM creation
5096
      for new_lv in new_lvs:
5097
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5098
                        _GetInstanceInfoText(instance), False)
5099

    
5100
    # Step: for each lv, detach+rename*2+attach
5101
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5102
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5103
      info("detaching %s drbd from local storage" % dev.iv_name)
5104
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5105
      result.Raise()
5106
      if not result.data:
5107
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5108
                                 " %s for device %s" % (tgt_node, dev.iv_name))
5109
      #dev.children = []
5110
      #cfg.Update(instance)
5111

    
5112
      # ok, we created the new LVs, so now we know we have the needed
5113
      # storage; as such, we proceed on the target node to rename
5114
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5115
      # using the assumption that logical_id == physical_id (which in
5116
      # turn is the unique_id on that node)
5117

    
5118
      # FIXME(iustin): use a better name for the replaced LVs
5119
      temp_suffix = int(time.time())
5120
      ren_fn = lambda d, suff: (d.physical_id[0],
5121
                                d.physical_id[1] + "_replaced-%s" % suff)
5122
      # build the rename list based on what LVs exist on the node
5123
      rlist = []
5124
      for to_ren in old_lvs:
5125
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5126
        if not result.RemoteFailMsg() and result.payload:
5127
          # device exists
5128
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5129

    
5130
      info("renaming the old LVs on the target node")
5131
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5132
      result.Raise()
5133
      if not result.data:
5134
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5135
      # now we rename the new LVs to the old LVs
5136
      info("renaming the new LVs on the target node")
5137
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5138
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5139
      result.Raise()
5140
      if not result.data:
5141
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5142

    
5143
      for old, new in zip(old_lvs, new_lvs):
5144
        new.logical_id = old.logical_id
5145
        cfg.SetDiskID(new, tgt_node)
5146

    
5147
      for disk in old_lvs:
5148
        disk.logical_id = ren_fn(disk, temp_suffix)
5149
        cfg.SetDiskID(disk, tgt_node)
5150

    
5151
      # now that the new lvs have the old name, we can add them to the device
5152
      info("adding new mirror component on %s" % tgt_node)
5153
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5154
      if result.failed or not result.data:
5155
        for new_lv in new_lvs:
5156
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5157
          if msg:
5158
            warning("Can't rollback device %s: %s", dev, msg,
5159
                    hint="cleanup manually the unused logical volumes")
5160
        raise errors.OpExecError("Can't add local storage to drbd")
5161

    
5162
      dev.children = new_lvs
5163
      cfg.Update(instance)
5164

    
5165
    # Step: wait for sync
5166

    
5167
    # this can fail as the old devices are degraded and _WaitForSync
5168
    # does a combined result over all disks, so we don't check its
5169
    # return value
5170
    self.proc.LogStep(5, steps_total, "sync devices")
5171
    _WaitForSync(self, instance, unlock=True)
5172

    
5173
    # so check manually all the devices
5174
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5175
      cfg.SetDiskID(dev, instance.primary_node)
5176
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5177
      msg = result.RemoteFailMsg()
5178
      if not msg and not result.payload:
5179
        msg = "disk not found"
5180
      if msg:
5181
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5182
                                 (name, msg))
5183
      if result.payload[5]:
5184
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5185

    
5186
    # Step: remove old storage
5187
    self.proc.LogStep(6, steps_total, "removing old storage")
5188
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5189
      info("remove logical volumes for %s" % name)
5190
      for lv in old_lvs:
5191
        cfg.SetDiskID(lv, tgt_node)
5192
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5193
        if msg:
5194
          warning("Can't remove old LV: %s" % msg,
5195
                  hint="manually remove unused LVs")
5196
          continue
5197

    
5198
  def _ExecD8Secondary(self, feedback_fn):
5199
    """Replace the secondary node for drbd8.
5200

5201
    The algorithm for replace is quite complicated:
5202
      - for all disks of the instance:
5203
        - create new LVs on the new node with same names
5204
        - shutdown the drbd device on the old secondary
5205
        - disconnect the drbd network on the primary
5206
        - create the drbd device on the new secondary
5207
        - network attach the drbd on the primary, using an artifice:
5208
          the drbd code for Attach() will connect to the network if it
5209
          finds a device which is connected to the good local disks but
5210
          not network enabled
5211
      - wait for sync across all devices
5212
      - remove all disks from the old secondary
5213

5214
    Failures are not very well handled.
5215

5216
    """
5217
    steps_total = 6
5218
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5219
    instance = self.instance
5220
    iv_names = {}
5221
    # start of work
5222
    cfg = self.cfg
5223
    old_node = self.tgt_node
5224
    new_node = self.new_node
5225
    pri_node = instance.primary_node
5226
    nodes_ip = {
5227
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5228
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5229
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5230
      }
5231

    
5232
    # Step: check device activation
5233
    self.proc.LogStep(1, steps_total, "check device existence")
5234
    info("checking volume groups")
5235
    my_vg = cfg.GetVGName()
5236
    results = self.rpc.call_vg_list([pri_node, new_node])
5237
    for node in pri_node, new_node:
5238
      res = results[node]
5239
      if res.failed or not res.data or my_vg not in res.data:
5240
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5241
                                 (my_vg, node))
5242
    for idx, dev in enumerate(instance.disks):
5243
      if idx not in self.op.disks:
5244
        continue
5245
      info("checking disk/%d on %s" % (idx, pri_node))
5246
      cfg.SetDiskID(dev, pri_node)
5247
      result = self.rpc.call_blockdev_find(pri_node, dev)
5248
      msg = result.RemoteFailMsg()
5249
      if not msg and not result.payload:
5250
        msg = "disk not found"
5251
      if msg:
5252
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5253
                                 (idx, pri_node, msg))
5254

    
5255
    # Step: check other node consistency
5256
    self.proc.LogStep(2, steps_total, "check peer consistency")
5257
    for idx, dev in enumerate(instance.disks):
5258
      if idx not in self.op.disks:
5259
        continue
5260
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5261
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5262
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5263
                                 " unsafe to replace the secondary" %
5264
                                 pri_node)
5265

    
5266
    # Step: create new storage
5267
    self.proc.LogStep(3, steps_total, "allocate new storage")
5268
    for idx, dev in enumerate(instance.disks):
5269
      info("adding new local storage on %s for disk/%d" %
5270
           (new_node, idx))
5271
      # we pass force_create=True to force LVM creation
5272
      for new_lv in dev.children:
5273
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5274
                        _GetInstanceInfoText(instance), False)
5275

    
5276
    # Step 4: dbrd minors and drbd setups changes
5277
    # after this, we must manually remove the drbd minors on both the
5278
    # error and the success paths
5279
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5280
                                   instance.name)
5281
    logging.debug("Allocated minors %s" % (minors,))
5282
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5283
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5284
      size = dev.size
5285
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5286
      # create new devices on new_node; note that we create two IDs:
5287
      # one without port, so the drbd will be activated without
5288
      # networking information on the new node at this stage, and one
5289
      # with network, for the latter activation in step 4
5290
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5291
      if pri_node == o_node1:
5292
        p_minor = o_minor1
5293
      else:
5294
        p_minor = o_minor2
5295

    
5296
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5297
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5298

    
5299
      iv_names[idx] = (dev, dev.children, new_net_id)
5300
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5301
                    new_net_id)
5302
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5303
                              logical_id=new_alone_id,
5304
                              children=dev.children)
5305
      try:
5306
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5307
                              _GetInstanceInfoText(instance), False)
5308
      except errors.GenericError:
5309
        self.cfg.ReleaseDRBDMinors(instance.name)
5310
        raise
5311

    
5312
    for idx, dev in enumerate(instance.disks):
5313
      # we have new devices, shutdown the drbd on the old secondary
5314
      info("shutting down drbd for disk/%d on old node" % idx)
5315
      cfg.SetDiskID(dev, old_node)
5316
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5317
      if msg:
5318
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5319
                (idx, msg),
5320
                hint="Please cleanup this device manually as soon as possible")
5321

    
5322
    info("detaching primary drbds from the network (=> standalone)")
5323
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5324
                                               instance.disks)[pri_node]
5325

    
5326
    msg = result.RemoteFailMsg()
5327
    if msg:
5328
      # detaches didn't succeed (unlikely)
5329
      self.cfg.ReleaseDRBDMinors(instance.name)
5330
      raise errors.OpExecError("Can't detach the disks from the network on"
5331
                               " old node: %s" % (msg,))
5332

    
5333
    # if we managed to detach at least one, we update all the disks of
5334
    # the instance to point to the new secondary
5335
    info("updating instance configuration")
5336
    for dev, _, new_logical_id in iv_names.itervalues():
5337
      dev.logical_id = new_logical_id
5338
      cfg.SetDiskID(dev, pri_node)
5339
    cfg.Update(instance)
5340

    
5341
    # and now perform the drbd attach
5342
    info("attaching primary drbds to new secondary (standalone => connected)")
5343
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5344
                                           instance.disks, instance.name,
5345
                                           False)
5346
    for to_node, to_result in result.items():
5347
      msg = to_result.RemoteFailMsg()
5348
      if msg:
5349
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5350
                hint="please do a gnt-instance info to see the"
5351
                " status of disks")
5352

    
5353
    # this can fail as the old devices are degraded and _WaitForSync
5354
    # does a combined result over all disks, so we don't check its
5355
    # return value
5356
    self.proc.LogStep(5, steps_total, "sync devices")
5357
    _WaitForSync(self, instance, unlock=True)
5358

    
5359
    # so check manually all the devices
5360
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5361
      cfg.SetDiskID(dev, pri_node)
5362
      result = self.rpc.call_blockdev_find(pri_node, dev)
5363
      msg = result.RemoteFailMsg()
5364
      if not msg and not result.payload:
5365
        msg = "disk not found"
5366
      if msg:
5367
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5368
                                 (idx, msg))
5369
      if result.payload[5]:
5370
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5371

    
5372
    self.proc.LogStep(6, steps_total, "removing old storage")
5373
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5374
      info("remove logical volumes for disk/%d" % idx)
5375
      for lv in old_lvs:
5376
        cfg.SetDiskID(lv, old_node)
5377
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5378
        if msg:
5379
          warning("Can't remove LV on old secondary: %s", msg,
5380
                  hint="Cleanup stale volumes by hand")
5381

    
5382
  def Exec(self, feedback_fn):
5383
    """Execute disk replacement.
5384

5385
    This dispatches the disk replacement to the appropriate handler.
5386

5387
    """
5388
    instance = self.instance
5389

    
5390
    # Activate the instance disks if we're replacing them on a down instance
5391
    if not instance.admin_up:
5392
      _StartInstanceDisks(self, instance, True)
5393

    
5394
    if self.op.mode == constants.REPLACE_DISK_CHG:
5395
      fn = self._ExecD8Secondary
5396
    else:
5397
      fn = self._ExecD8DiskOnly
5398

    
5399
    ret = fn(feedback_fn)
5400

    
5401
    # Deactivate the instance disks if we're replacing them on a down instance
5402
    if not instance.admin_up:
5403
      _SafeShutdownInstanceDisks(self, instance)
5404

    
5405
    return ret
5406

    
5407

    
5408
class LUGrowDisk(LogicalUnit):
5409
  """Grow a disk of an instance.
5410

5411
  """
5412
  HPATH = "disk-grow"
5413
  HTYPE = constants.HTYPE_INSTANCE
5414
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5415
  REQ_BGL = False
5416

    
5417
  def ExpandNames(self):
5418
    self._ExpandAndLockInstance()
5419
    self.needed_locks[locking.LEVEL_NODE] = []
5420
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5421

    
5422
  def DeclareLocks(self, level):
5423
    if level == locking.LEVEL_NODE:
5424
      self._LockInstancesNodes()
5425

    
5426
  def BuildHooksEnv(self):
5427
    """Build hooks env.
5428

5429
    This runs on the master, the primary and all the secondaries.
5430

5431
    """
5432
    env = {
5433
      "DISK": self.op.disk,
5434
      "AMOUNT": self.op.amount,
5435
      }
5436
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5437
    nl = [
5438
      self.cfg.GetMasterNode(),
5439
      self.instance.primary_node,
5440
      ]
5441
    return env, nl, nl
5442

    
5443
  def CheckPrereq(self):
5444
    """Check prerequisites.
5445

5446
    This checks that the instance is in the cluster.
5447

5448
    """
5449
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5450
    assert instance is not None, \
5451
      "Cannot retrieve locked instance %s" % self.op.instance_name
5452
    nodenames = list(instance.all_nodes)
5453
    for node in nodenames:
5454
      _CheckNodeOnline(self, node)
5455

    
5456

    
5457
    self.instance = instance
5458

    
5459
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5460
      raise errors.OpPrereqError("Instance's disk layout does not support"
5461
                                 " growing.")
5462

    
5463
    self.disk = instance.FindDisk(self.op.disk)
5464

    
5465
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5466
                                       instance.hypervisor)
5467
    for node in nodenames:
5468
      info = nodeinfo[node]
5469
      if info.failed or not info.data:
5470
        raise errors.OpPrereqError("Cannot get current information"
5471
                                   " from node '%s'" % node)
5472
      vg_free = info.data.get('vg_free', None)
5473
      if not isinstance(vg_free, int):
5474
        raise errors.OpPrereqError("Can't compute free disk space on"
5475
                                   " node %s" % node)
5476
      if self.op.amount > vg_free:
5477
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5478
                                   " %d MiB available, %d MiB required" %
5479
                                   (node, vg_free, self.op.amount))
5480

    
5481
  def Exec(self, feedback_fn):
5482
    """Execute disk grow.
5483

5484
    """
5485
    instance = self.instance
5486
    disk = self.disk
5487
    for node in instance.all_nodes:
5488
      self.cfg.SetDiskID(disk, node)
5489
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5490
      msg = result.RemoteFailMsg()
5491
      if msg:
5492
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5493
                                 (node, msg))
5494
    disk.RecordGrow(self.op.amount)
5495
    self.cfg.Update(instance)
5496
    if self.op.wait_for_sync:
5497
      disk_abort = not _WaitForSync(self, instance)
5498
      if disk_abort:
5499
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5500
                             " status.\nPlease check the instance.")
5501

    
5502

    
5503
class LUQueryInstanceData(NoHooksLU):
5504
  """Query runtime instance data.
5505

5506
  """
5507
  _OP_REQP = ["instances", "static"]
5508
  REQ_BGL = False
5509

    
5510
  def ExpandNames(self):
5511
    self.needed_locks = {}
5512
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5513

    
5514
    if not isinstance(self.op.instances, list):
5515
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5516

    
5517
    if self.op.instances:
5518
      self.wanted_names = []
5519
      for name in self.op.instances:
5520
        full_name = self.cfg.ExpandInstanceName(name)
5521
        if full_name is None:
5522
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5523
        self.wanted_names.append(full_name)
5524
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5525
    else:
5526
      self.wanted_names = None
5527
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5528

    
5529
    self.needed_locks[locking.LEVEL_NODE] = []
5530
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5531

    
5532
  def DeclareLocks(self, level):
5533
    if level == locking.LEVEL_NODE:
5534
      self._LockInstancesNodes()
5535

    
5536
  def CheckPrereq(self):
5537
    """Check prerequisites.
5538

5539
    This only checks the optional instance list against the existing names.
5540

5541
    """
5542
    if self.wanted_names is None:
5543
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5544

    
5545
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5546
                             in self.wanted_names]
5547
    return
5548

    
5549
  def _ComputeDiskStatus(self, instance, snode, dev):
5550
    """Compute block device status.
5551

5552
    """
5553
    static = self.op.static
5554
    if not static:
5555
      self.cfg.SetDiskID(dev, instance.primary_node)
5556
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5557
      if dev_pstatus.offline:
5558
        dev_pstatus = None
5559
      else:
5560
        msg = dev_pstatus.RemoteFailMsg()
5561
        if msg:
5562
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5563
                                   (instance.name, msg))
5564
        dev_pstatus = dev_pstatus.payload
5565
    else:
5566
      dev_pstatus = None
5567

    
5568
    if dev.dev_type in constants.LDS_DRBD:
5569
      # we change the snode then (otherwise we use the one passed in)
5570
      if dev.logical_id[0] == instance.primary_node:
5571
        snode = dev.logical_id[1]
5572
      else:
5573
        snode = dev.logical_id[0]
5574

    
5575
    if snode and not static:
5576
      self.cfg.SetDiskID(dev, snode)
5577
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5578
      if dev_sstatus.offline:
5579
        dev_sstatus = None
5580
      else:
5581
        msg = dev_sstatus.RemoteFailMsg()
5582
        if msg:
5583
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5584
                                   (instance.name, msg))
5585
        dev_sstatus = dev_sstatus.payload
5586
    else:
5587
      dev_sstatus = None
5588

    
5589
    if dev.children:
5590
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5591
                      for child in dev.children]
5592
    else:
5593
      dev_children = []
5594

    
5595
    data = {
5596
      "iv_name": dev.iv_name,
5597
      "dev_type": dev.dev_type,
5598
      "logical_id": dev.logical_id,
5599
      "physical_id": dev.physical_id,
5600
      "pstatus": dev_pstatus,
5601
      "sstatus": dev_sstatus,
5602
      "children": dev_children,
5603
      "mode": dev.mode,
5604
      }
5605

    
5606
    return data
5607

    
5608
  def Exec(self, feedback_fn):
5609
    """Gather and return data"""
5610
    result = {}
5611

    
5612
    cluster = self.cfg.GetClusterInfo()
5613

    
5614
    for instance in self.wanted_instances:
5615
      if not self.op.static:
5616
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5617
                                                  instance.name,
5618
                                                  instance.hypervisor)
5619
        remote_info.Raise()
5620
        remote_info = remote_info.data
5621
        if remote_info and "state" in remote_info:
5622
          remote_state = "up"
5623
        else:
5624
          remote_state = "down"
5625
      else:
5626
        remote_state = None
5627
      if instance.admin_up:
5628
        config_state = "up"
5629
      else:
5630
        config_state = "down"
5631

    
5632
      disks = [self._ComputeDiskStatus(instance, None, device)
5633
               for device in instance.disks]
5634

    
5635
      idict = {
5636
        "name": instance.name,
5637
        "config_state": config_state,
5638
        "run_state": remote_state,
5639
        "pnode": instance.primary_node,
5640
        "snodes": instance.secondary_nodes,
5641
        "os": instance.os,
5642
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5643
        "disks": disks,
5644
        "hypervisor": instance.hypervisor,
5645
        "network_port": instance.network_port,
5646
        "hv_instance": instance.hvparams,
5647
        "hv_actual": cluster.FillHV(instance),
5648
        "be_instance": instance.beparams,
5649
        "be_actual": cluster.FillBE(instance),
5650
        }
5651

    
5652
      result[instance.name] = idict
5653

    
5654
    return result
5655

    
5656

    
5657
class LUSetInstanceParams(LogicalUnit):
5658
  """Modifies an instances's parameters.
5659

5660
  """
5661
  HPATH = "instance-modify"
5662
  HTYPE = constants.HTYPE_INSTANCE
5663
  _OP_REQP = ["instance_name"]
5664
  REQ_BGL = False
5665

    
5666
  def CheckArguments(self):
5667
    if not hasattr(self.op, 'nics'):
5668
      self.op.nics = []
5669
    if not hasattr(self.op, 'disks'):
5670
      self.op.disks = []
5671
    if not hasattr(self.op, 'beparams'):
5672
      self.op.beparams = {}
5673
    if not hasattr(self.op, 'hvparams'):
5674
      self.op.hvparams = {}
5675
    self.op.force = getattr(self.op, "force", False)
5676
    if not (self.op.nics or self.op.disks or
5677
            self.op.hvparams or self.op.beparams):
5678
      raise errors.OpPrereqError("No changes submitted")
5679

    
5680
    # Disk validation
5681
    disk_addremove = 0
5682
    for disk_op, disk_dict in self.op.disks:
5683
      if disk_op == constants.DDM_REMOVE:
5684
        disk_addremove += 1
5685
        continue
5686
      elif disk_op == constants.DDM_ADD:
5687
        disk_addremove += 1
5688
      else:
5689
        if not isinstance(disk_op, int):
5690
          raise errors.OpPrereqError("Invalid disk index")
5691
      if disk_op == constants.DDM_ADD:
5692
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5693
        if mode not in constants.DISK_ACCESS_SET:
5694
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5695
        size = disk_dict.get('size', None)
5696
        if size is None:
5697
          raise errors.OpPrereqError("Required disk parameter size missing")
5698
        try:
5699
          size = int(size)
5700
        except ValueError, err:
5701
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5702
                                     str(err))
5703
        disk_dict['size'] = size
5704
      else:
5705
        # modification of disk
5706
        if 'size' in disk_dict:
5707
          raise errors.OpPrereqError("Disk size change not possible, use"
5708
                                     " grow-disk")
5709

    
5710
    if disk_addremove > 1:
5711
      raise errors.OpPrereqError("Only one disk add or remove operation"
5712
                                 " supported at a time")
5713

    
5714
    # NIC validation
5715
    nic_addremove = 0
5716
    for nic_op, nic_dict in self.op.nics:
5717
      if nic_op == constants.DDM_REMOVE:
5718
        nic_addremove += 1
5719
        continue
5720
      elif nic_op == constants.DDM_ADD:
5721
        nic_addremove += 1
5722
      else:
5723
        if not isinstance(nic_op, int):
5724
          raise errors.OpPrereqError("Invalid nic index")
5725

    
5726
      # nic_dict should be a dict
5727
      nic_ip = nic_dict.get('ip', None)
5728
      if nic_ip is not None:
5729
        if nic_ip.lower() == constants.VALUE_NONE:
5730
          nic_dict['ip'] = None
5731
        else:
5732
          if not utils.IsValidIP(nic_ip):
5733
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5734

    
5735
      if nic_op == constants.DDM_ADD:
5736
        nic_bridge = nic_dict.get('bridge', None)
5737
        if nic_bridge is None:
5738
          nic_dict['bridge'] = self.cfg.GetDefBridge()
5739
        nic_mac = nic_dict.get('mac', None)
5740
        if nic_mac is None:
5741
          nic_dict['mac'] = constants.VALUE_AUTO
5742

    
5743
      if 'mac' in nic_dict:
5744
        nic_mac = nic_dict['mac']
5745
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5746
          if not utils.IsValidMac(nic_mac):
5747
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5748
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5749
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5750
                                     " modifying an existing nic")
5751

    
5752
    if nic_addremove > 1:
5753
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5754
                                 " supported at a time")
5755

    
5756
  def ExpandNames(self):
5757
    self._ExpandAndLockInstance()
5758
    self.needed_locks[locking.LEVEL_NODE] = []
5759
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5760

    
5761
  def DeclareLocks(self, level):
5762
    if level == locking.LEVEL_NODE:
5763
      self._LockInstancesNodes()
5764

    
5765
  def BuildHooksEnv(self):
5766
    """Build hooks env.
5767

5768
    This runs on the master, primary and secondaries.
5769

5770
    """
5771
    args = dict()
5772
    if constants.BE_MEMORY in self.be_new:
5773
      args['memory'] = self.be_new[constants.BE_MEMORY]
5774
    if constants.BE_VCPUS in self.be_new:
5775
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5776
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5777
    # information at all.
5778
    if self.op.nics:
5779
      args['nics'] = []
5780
      nic_override = dict(self.op.nics)
5781
      for idx, nic in enumerate(self.instance.nics):
5782
        if idx in nic_override:
5783
          this_nic_override = nic_override[idx]
5784
        else:
5785
          this_nic_override = {}
5786
        if 'ip' in this_nic_override:
5787
          ip = this_nic_override['ip']
5788
        else:
5789
          ip = nic.ip
5790
        if 'bridge' in this_nic_override:
5791
          bridge = this_nic_override['bridge']
5792
        else:
5793
          bridge = nic.bridge
5794
        if 'mac' in this_nic_override:
5795
          mac = this_nic_override['mac']
5796
        else:
5797
          mac = nic.mac
5798
        args['nics'].append((ip, bridge, mac))
5799
      if constants.DDM_ADD in nic_override:
5800
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5801
        bridge = nic_override[constants.DDM_ADD]['bridge']
5802
        mac = nic_override[constants.DDM_ADD]['mac']
5803
        args['nics'].append((ip, bridge, mac))
5804
      elif constants.DDM_REMOVE in nic_override:
5805
        del args['nics'][-1]
5806

    
5807
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5808
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5809
    return env, nl, nl
5810

    
5811
  def CheckPrereq(self):
5812
    """Check prerequisites.
5813

5814
    This only checks the instance list against the existing names.
5815

5816
    """
5817
    force = self.force = self.op.force
5818

    
5819
    # checking the new params on the primary/secondary nodes
5820

    
5821
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5822
    assert self.instance is not None, \
5823
      "Cannot retrieve locked instance %s" % self.op.instance_name
5824
    pnode = instance.primary_node
5825
    nodelist = list(instance.all_nodes)
5826

    
5827
    # hvparams processing
5828
    if self.op.hvparams:
5829
      i_hvdict = copy.deepcopy(instance.hvparams)
5830
      for key, val in self.op.hvparams.iteritems():
5831
        if val == constants.VALUE_DEFAULT:
5832
          try:
5833
            del i_hvdict[key]
5834
          except KeyError:
5835
            pass
5836
        else:
5837
          i_hvdict[key] = val
5838
      cluster = self.cfg.GetClusterInfo()
5839
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5840
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5841
                                i_hvdict)
5842
      # local check
5843
      hypervisor.GetHypervisor(
5844
        instance.hypervisor).CheckParameterSyntax(hv_new)
5845
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5846
      self.hv_new = hv_new # the new actual values
5847
      self.hv_inst = i_hvdict # the new dict (without defaults)
5848
    else:
5849
      self.hv_new = self.hv_inst = {}
5850

    
5851
    # beparams processing
5852
    if self.op.beparams:
5853
      i_bedict = copy.deepcopy(instance.beparams)
5854
      for key, val in self.op.beparams.iteritems():
5855
        if val == constants.VALUE_DEFAULT:
5856
          try:
5857
            del i_bedict[key]
5858
          except KeyError:
5859
            pass
5860
        else:
5861
          i_bedict[key] = val
5862
      cluster = self.cfg.GetClusterInfo()
5863
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5864
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5865
                                i_bedict)
5866
      self.be_new = be_new # the new actual values
5867
      self.be_inst = i_bedict # the new dict (without defaults)
5868
    else:
5869
      self.be_new = self.be_inst = {}
5870

    
5871
    self.warn = []
5872

    
5873
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5874
      mem_check_list = [pnode]
5875
      if be_new[constants.BE_AUTO_BALANCE]:
5876
        # either we changed auto_balance to yes or it was from before
5877
        mem_check_list.extend(instance.secondary_nodes)
5878
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5879
                                                  instance.hypervisor)
5880
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5881
                                         instance.hypervisor)
5882
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5883
        # Assume the primary node is unreachable and go ahead
5884
        self.warn.append("Can't get info from primary node %s" % pnode)
5885
      else:
5886
        if not instance_info.failed and instance_info.data:
5887
          current_mem = instance_info.data['memory']
5888
        else:
5889
          # Assume instance not running
5890
          # (there is a slight race condition here, but it's not very probable,
5891
          # and we have no other way to check)
5892
          current_mem = 0
5893
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5894
                    nodeinfo[pnode].data['memory_free'])
5895
        if miss_mem > 0:
5896
          raise errors.OpPrereqError("This change will prevent the instance"
5897
                                     " from starting, due to %d MB of memory"
5898
                                     " missing on its primary node" % miss_mem)
5899

    
5900
      if be_new[constants.BE_AUTO_BALANCE]:
5901
        for node, nres in nodeinfo.iteritems():
5902
          if node not in instance.secondary_nodes:
5903
            continue
5904
          if nres.failed or not isinstance(nres.data, dict):
5905
            self.warn.append("Can't get info from secondary node %s" % node)
5906
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5907
            self.warn.append("Not enough memory to failover instance to"
5908
                             " secondary node %s" % node)
5909

    
5910
    # NIC processing
5911
    for nic_op, nic_dict in self.op.nics:
5912
      if nic_op == constants.DDM_REMOVE:
5913
        if not instance.nics:
5914
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5915
        continue
5916
      if nic_op != constants.DDM_ADD:
5917
        # an existing nic
5918
        if nic_op < 0 or nic_op >= len(instance.nics):
5919
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5920
                                     " are 0 to %d" %
5921
                                     (nic_op, len(instance.nics)))
5922
      if 'bridge' in nic_dict:
5923
        nic_bridge = nic_dict['bridge']
5924
        if nic_bridge is None:
5925
          raise errors.OpPrereqError('Cannot set the nic bridge to None')
5926
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5927
          msg = ("Bridge '%s' doesn't exist on one of"
5928
                 " the instance nodes" % nic_bridge)
5929
          if self.force:
5930
            self.warn.append(msg)
5931
          else:
5932
            raise errors.OpPrereqError(msg)
5933
      if 'mac' in nic_dict:
5934
        nic_mac = nic_dict['mac']
5935
        if nic_mac is None:
5936
          raise errors.OpPrereqError('Cannot set the nic mac to None')
5937
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5938
          # otherwise generate the mac
5939
          nic_dict['mac'] = self.cfg.GenerateMAC()
5940
        else:
5941
          # or validate/reserve the current one
5942
          if self.cfg.IsMacInUse(nic_mac):
5943
            raise errors.OpPrereqError("MAC address %s already in use"
5944
                                       " in cluster" % nic_mac)
5945

    
5946
    # DISK processing
5947
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5948
      raise errors.OpPrereqError("Disk operations not supported for"
5949
                                 " diskless instances")
5950
    for disk_op, disk_dict in self.op.disks:
5951
      if disk_op == constants.DDM_REMOVE:
5952
        if len(instance.disks) == 1:
5953
          raise errors.OpPrereqError("Cannot remove the last disk of"
5954
                                     " an instance")
5955
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5956
        ins_l = ins_l[pnode]
5957
        if ins_l.failed or not isinstance(ins_l.data, list):
5958
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5959
        if instance.name in ins_l.data:
5960
          raise errors.OpPrereqError("Instance is running, can't remove"
5961
                                     " disks.")
5962

    
5963
      if (disk_op == constants.DDM_ADD and
5964
          len(instance.nics) >= constants.MAX_DISKS):
5965
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5966
                                   " add more" % constants.MAX_DISKS)
5967
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5968
        # an existing disk
5969
        if disk_op < 0 or disk_op >= len(instance.disks):
5970
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5971
                                     " are 0 to %d" %
5972
                                     (disk_op, len(instance.disks)))
5973

    
5974
    return
5975

    
5976
  def Exec(self, feedback_fn):
5977
    """Modifies an instance.
5978

5979
    All parameters take effect only at the next restart of the instance.
5980

5981
    """
5982
    # Process here the warnings from CheckPrereq, as we don't have a
5983
    # feedback_fn there.
5984
    for warn in self.warn:
5985
      feedback_fn("WARNING: %s" % warn)
5986

    
5987
    result = []
5988
    instance = self.instance
5989
    # disk changes
5990
    for disk_op, disk_dict in self.op.disks:
5991
      if disk_op == constants.DDM_REMOVE:
5992
        # remove the last disk
5993
        device = instance.disks.pop()
5994
        device_idx = len(instance.disks)
5995
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5996
          self.cfg.SetDiskID(disk, node)
5997
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5998
          if msg:
5999
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
6000
                            " continuing anyway", device_idx, node, msg)
6001
        result.append(("disk/%d" % device_idx, "remove"))
6002
      elif disk_op == constants.DDM_ADD:
6003
        # add a new disk
6004
        if instance.disk_template == constants.DT_FILE:
6005
          file_driver, file_path = instance.disks[0].logical_id
6006
          file_path = os.path.dirname(file_path)
6007
        else:
6008
          file_driver = file_path = None
6009
        disk_idx_base = len(instance.disks)
6010
        new_disk = _GenerateDiskTemplate(self,
6011
                                         instance.disk_template,
6012
                                         instance.name, instance.primary_node,
6013
                                         instance.secondary_nodes,
6014
                                         [disk_dict],
6015
                                         file_path,
6016
                                         file_driver,
6017
                                         disk_idx_base)[0]
6018
        instance.disks.append(new_disk)
6019
        info = _GetInstanceInfoText(instance)
6020

    
6021
        logging.info("Creating volume %s for instance %s",
6022
                     new_disk.iv_name, instance.name)
6023
        # Note: this needs to be kept in sync with _CreateDisks
6024
        #HARDCODE
6025
        for node in instance.all_nodes:
6026
          f_create = node == instance.primary_node
6027
          try:
6028
            _CreateBlockDev(self, node, instance, new_disk,
6029
                            f_create, info, f_create)
6030
          except errors.OpExecError, err:
6031
            self.LogWarning("Failed to create volume %s (%s) on"
6032
                            " node %s: %s",
6033
                            new_disk.iv_name, new_disk, node, err)
6034
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6035
                       (new_disk.size, new_disk.mode)))
6036
      else:
6037
        # change a given disk
6038
        instance.disks[disk_op].mode = disk_dict['mode']
6039
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6040
    # NIC changes
6041
    for nic_op, nic_dict in self.op.nics:
6042
      if nic_op == constants.DDM_REMOVE:
6043
        # remove the last nic
6044
        del instance.nics[-1]
6045
        result.append(("nic.%d" % len(instance.nics), "remove"))
6046
      elif nic_op == constants.DDM_ADD:
6047
        # mac and bridge should be set, by now
6048
        mac = nic_dict['mac']
6049
        bridge = nic_dict['bridge']
6050
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6051
                              bridge=bridge)
6052
        instance.nics.append(new_nic)
6053
        result.append(("nic.%d" % (len(instance.nics) - 1),
6054
                       "add:mac=%s,ip=%s,bridge=%s" %
6055
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
6056
      else:
6057
        # change a given nic
6058
        for key in 'mac', 'ip', 'bridge':
6059
          if key in nic_dict:
6060
            setattr(instance.nics[nic_op], key, nic_dict[key])
6061
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6062

    
6063
    # hvparams changes
6064
    if self.op.hvparams:
6065
      instance.hvparams = self.hv_inst
6066
      for key, val in self.op.hvparams.iteritems():
6067
        result.append(("hv/%s" % key, val))
6068

    
6069
    # beparams changes
6070
    if self.op.beparams:
6071
      instance.beparams = self.be_inst
6072
      for key, val in self.op.beparams.iteritems():
6073
        result.append(("be/%s" % key, val))
6074

    
6075
    self.cfg.Update(instance)
6076

    
6077
    return result
6078

    
6079

    
6080
class LUQueryExports(NoHooksLU):
6081
  """Query the exports list
6082

6083
  """
6084
  _OP_REQP = ['nodes']
6085
  REQ_BGL = False
6086

    
6087
  def ExpandNames(self):
6088
    self.needed_locks = {}
6089
    self.share_locks[locking.LEVEL_NODE] = 1
6090
    if not self.op.nodes:
6091
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6092
    else:
6093
      self.needed_locks[locking.LEVEL_NODE] = \
6094
        _GetWantedNodes(self, self.op.nodes)
6095

    
6096
  def CheckPrereq(self):
6097
    """Check prerequisites.
6098

6099
    """
6100
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6101

    
6102
  def Exec(self, feedback_fn):
6103
    """Compute the list of all the exported system images.
6104

6105
    @rtype: dict
6106
    @return: a dictionary with the structure node->(export-list)
6107
        where export-list is a list of the instances exported on
6108
        that node.
6109

6110
    """
6111
    rpcresult = self.rpc.call_export_list(self.nodes)
6112
    result = {}
6113
    for node in rpcresult:
6114
      if rpcresult[node].failed:
6115
        result[node] = False
6116
      else:
6117
        result[node] = rpcresult[node].data
6118

    
6119
    return result
6120

    
6121

    
6122
class LUExportInstance(LogicalUnit):
6123
  """Export an instance to an image in the cluster.
6124

6125
  """
6126
  HPATH = "instance-export"
6127
  HTYPE = constants.HTYPE_INSTANCE
6128
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6129
  REQ_BGL = False
6130

    
6131
  def ExpandNames(self):
6132
    self._ExpandAndLockInstance()
6133
    # FIXME: lock only instance primary and destination node
6134
    #
6135
    # Sad but true, for now we have do lock all nodes, as we don't know where
6136
    # the previous export might be, and and in this LU we search for it and
6137
    # remove it from its current node. In the future we could fix this by:
6138
    #  - making a tasklet to search (share-lock all), then create the new one,
6139
    #    then one to remove, after
6140
    #  - removing the removal operation altoghether
6141
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6142

    
6143
  def DeclareLocks(self, level):
6144
    """Last minute lock declaration."""
6145
    # All nodes are locked anyway, so nothing to do here.
6146

    
6147
  def BuildHooksEnv(self):
6148
    """Build hooks env.
6149

6150
    This will run on the master, primary node and target node.
6151

6152
    """
6153
    env = {
6154
      "EXPORT_NODE": self.op.target_node,
6155
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6156
      }
6157
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6158
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6159
          self.op.target_node]
6160
    return env, nl, nl
6161

    
6162
  def CheckPrereq(self):
6163
    """Check prerequisites.
6164

6165
    This checks that the instance and node names are valid.
6166

6167
    """
6168
    instance_name = self.op.instance_name
6169
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6170
    assert self.instance is not None, \
6171
          "Cannot retrieve locked instance %s" % self.op.instance_name
6172
    _CheckNodeOnline(self, self.instance.primary_node)
6173

    
6174
    self.dst_node = self.cfg.GetNodeInfo(
6175
      self.cfg.ExpandNodeName(self.op.target_node))
6176

    
6177
    if self.dst_node is None:
6178
      # This is wrong node name, not a non-locked node
6179
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6180
    _CheckNodeOnline(self, self.dst_node.name)
6181
    _CheckNodeNotDrained(self, self.dst_node.name)
6182

    
6183
    # instance disk type verification
6184
    for disk in self.instance.disks:
6185
      if disk.dev_type == constants.LD_FILE:
6186
        raise errors.OpPrereqError("Export not supported for instances with"
6187
                                   " file-based disks")
6188

    
6189
  def Exec(self, feedback_fn):
6190
    """Export an instance to an image in the cluster.
6191

6192
    """
6193
    instance = self.instance
6194
    dst_node = self.dst_node
6195
    src_node = instance.primary_node
6196
    if self.op.shutdown:
6197
      # shutdown the instance, but not the disks
6198
      result = self.rpc.call_instance_shutdown(src_node, instance)
6199
      msg = result.RemoteFailMsg()
6200
      if msg:
6201
        raise errors.OpExecError("Could not shutdown instance %s on"
6202
                                 " node %s: %s" %
6203
                                 (instance.name, src_node, msg))
6204

    
6205
    vgname = self.cfg.GetVGName()
6206

    
6207
    snap_disks = []
6208

    
6209
    # set the disks ID correctly since call_instance_start needs the
6210
    # correct drbd minor to create the symlinks
6211
    for disk in instance.disks:
6212
      self.cfg.SetDiskID(disk, src_node)
6213

    
6214
    try:
6215
      for disk in instance.disks:
6216
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6217
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6218
        if new_dev_name.failed or not new_dev_name.data:
6219
          self.LogWarning("Could not snapshot block device %s on node %s",
6220
                          disk.logical_id[1], src_node)
6221
          snap_disks.append(False)
6222
        else:
6223
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6224
                                 logical_id=(vgname, new_dev_name.data),
6225
                                 physical_id=(vgname, new_dev_name.data),
6226
                                 iv_name=disk.iv_name)
6227
          snap_disks.append(new_dev)
6228

    
6229
    finally:
6230
      if self.op.shutdown and instance.admin_up:
6231
        result = self.rpc.call_instance_start(src_node, instance)
6232
        msg = result.RemoteFailMsg()
6233
        if msg:
6234
          _ShutdownInstanceDisks(self, instance)
6235
          raise errors.OpExecError("Could not start instance: %s" % msg)
6236

    
6237
    # TODO: check for size
6238

    
6239
    cluster_name = self.cfg.GetClusterName()
6240
    for idx, dev in enumerate(snap_disks):
6241
      if dev:
6242
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6243
                                               instance, cluster_name, idx)
6244
        if result.failed or not result.data:
6245
          self.LogWarning("Could not export block device %s from node %s to"
6246
                          " node %s", dev.logical_id[1], src_node,
6247
                          dst_node.name)
6248
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6249
        if msg:
6250
          self.LogWarning("Could not remove snapshot block device %s from node"
6251
                          " %s: %s", dev.logical_id[1], src_node, msg)
6252

    
6253
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6254
    if result.failed or not result.data:
6255
      self.LogWarning("Could not finalize export for instance %s on node %s",
6256
                      instance.name, dst_node.name)
6257

    
6258
    nodelist = self.cfg.GetNodeList()
6259
    nodelist.remove(dst_node.name)
6260

    
6261
    # on one-node clusters nodelist will be empty after the removal
6262
    # if we proceed the backup would be removed because OpQueryExports
6263
    # substitutes an empty list with the full cluster node list.
6264
    if nodelist:
6265
      exportlist = self.rpc.call_export_list(nodelist)
6266
      for node in exportlist:
6267
        if exportlist[node].failed:
6268
          continue
6269
        if instance.name in exportlist[node].data:
6270
          if not self.rpc.call_export_remove(node, instance.name):
6271
            self.LogWarning("Could not remove older export for instance %s"
6272
                            " on node %s", instance.name, node)
6273

    
6274

    
6275
class LURemoveExport(NoHooksLU):
6276
  """Remove exports related to the named instance.
6277

6278
  """
6279
  _OP_REQP = ["instance_name"]
6280
  REQ_BGL = False
6281

    
6282
  def ExpandNames(self):
6283
    self.needed_locks = {}
6284
    # We need all nodes to be locked in order for RemoveExport to work, but we
6285
    # don't need to lock the instance itself, as nothing will happen to it (and
6286
    # we can remove exports also for a removed instance)
6287
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6288

    
6289
  def CheckPrereq(self):
6290
    """Check prerequisites.
6291
    """
6292
    pass
6293

    
6294
  def Exec(self, feedback_fn):
6295
    """Remove any export.
6296

6297
    """
6298
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6299
    # If the instance was not found we'll try with the name that was passed in.
6300
    # This will only work if it was an FQDN, though.
6301
    fqdn_warn = False
6302
    if not instance_name:
6303
      fqdn_warn = True
6304
      instance_name = self.op.instance_name
6305

    
6306
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6307
      locking.LEVEL_NODE])
6308
    found = False
6309
    for node in exportlist:
6310
      if exportlist[node].failed:
6311
        self.LogWarning("Failed to query node %s, continuing" % node)
6312
        continue
6313
      if instance_name in exportlist[node].data:
6314
        found = True
6315
        result = self.rpc.call_export_remove(node, instance_name)
6316
        if result.failed or not result.data:
6317
          logging.error("Could not remove export for instance %s"
6318
                        " on node %s", instance_name, node)
6319

    
6320
    if fqdn_warn and not found:
6321
      feedback_fn("Export not found. If trying to remove an export belonging"
6322
                  " to a deleted instance please use its Fully Qualified"
6323
                  " Domain Name.")
6324

    
6325

    
6326
class TagsLU(NoHooksLU):
6327
  """Generic tags LU.
6328

6329
  This is an abstract class which is the parent of all the other tags LUs.
6330

6331
  """
6332

    
6333
  def ExpandNames(self):
6334
    self.needed_locks = {}
6335
    if self.op.kind == constants.TAG_NODE:
6336
      name = self.cfg.ExpandNodeName(self.op.name)
6337
      if name is None:
6338
        raise errors.OpPrereqError("Invalid node name (%s)" %
6339
                                   (self.op.name,))
6340
      self.op.name = name
6341
      self.needed_locks[locking.LEVEL_NODE] = name
6342
    elif self.op.kind == constants.TAG_INSTANCE:
6343
      name = self.cfg.ExpandInstanceName(self.op.name)
6344
      if name is None:
6345
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6346
                                   (self.op.name,))
6347
      self.op.name = name
6348
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6349

    
6350
  def CheckPrereq(self):
6351
    """Check prerequisites.
6352

6353
    """
6354
    if self.op.kind == constants.TAG_CLUSTER:
6355
      self.target = self.cfg.GetClusterInfo()
6356
    elif self.op.kind == constants.TAG_NODE:
6357
      self.target = self.cfg.GetNodeInfo(self.op.name)
6358
    elif self.op.kind == constants.TAG_INSTANCE:
6359
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6360
    else:
6361
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6362
                                 str(self.op.kind))
6363

    
6364

    
6365
class LUGetTags(TagsLU):
6366
  """Returns the tags of a given object.
6367

6368
  """
6369
  _OP_REQP = ["kind", "name"]
6370
  REQ_BGL = False
6371

    
6372
  def Exec(self, feedback_fn):
6373
    """Returns the tag list.
6374

6375
    """
6376
    return list(self.target.GetTags())
6377

    
6378

    
6379
class LUSearchTags(NoHooksLU):
6380
  """Searches the tags for a given pattern.
6381

6382
  """
6383
  _OP_REQP = ["pattern"]
6384
  REQ_BGL = False
6385

    
6386
  def ExpandNames(self):
6387
    self.needed_locks = {}
6388

    
6389
  def CheckPrereq(self):
6390
    """Check prerequisites.
6391

6392
    This checks the pattern passed for validity by compiling it.
6393

6394
    """
6395
    try:
6396
      self.re = re.compile(self.op.pattern)
6397
    except re.error, err:
6398
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6399
                                 (self.op.pattern, err))
6400

    
6401
  def Exec(self, feedback_fn):
6402
    """Returns the tag list.
6403

6404
    """
6405
    cfg = self.cfg
6406
    tgts = [("/cluster", cfg.GetClusterInfo())]
6407
    ilist = cfg.GetAllInstancesInfo().values()
6408
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6409
    nlist = cfg.GetAllNodesInfo().values()
6410
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6411
    results = []
6412
    for path, target in tgts:
6413
      for tag in target.GetTags():
6414
        if self.re.search(tag):
6415
          results.append((path, tag))
6416
    return results
6417

    
6418

    
6419
class LUAddTags(TagsLU):
6420
  """Sets a tag on a given object.
6421

6422
  """
6423
  _OP_REQP = ["kind", "name", "tags"]
6424
  REQ_BGL = False
6425

    
6426
  def CheckPrereq(self):
6427
    """Check prerequisites.
6428

6429
    This checks the type and length of the tag name and value.
6430

6431
    """
6432
    TagsLU.CheckPrereq(self)
6433
    for tag in self.op.tags:
6434
      objects.TaggableObject.ValidateTag(tag)
6435

    
6436
  def Exec(self, feedback_fn):
6437
    """Sets the tag.
6438

6439
    """
6440
    try:
6441
      for tag in self.op.tags:
6442
        self.target.AddTag(tag)
6443
    except errors.TagError, err:
6444
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6445
    try:
6446
      self.cfg.Update(self.target)
6447
    except errors.ConfigurationError:
6448
      raise errors.OpRetryError("There has been a modification to the"
6449
                                " config file and the operation has been"
6450
                                " aborted. Please retry.")
6451

    
6452

    
6453
class LUDelTags(TagsLU):
6454
  """Delete a list of tags from a given object.
6455

6456
  """
6457
  _OP_REQP = ["kind", "name", "tags"]
6458
  REQ_BGL = False
6459

    
6460
  def CheckPrereq(self):
6461
    """Check prerequisites.
6462

6463
    This checks that we have the given tag.
6464

6465
    """
6466
    TagsLU.CheckPrereq(self)
6467
    for tag in self.op.tags:
6468
      objects.TaggableObject.ValidateTag(tag)
6469
    del_tags = frozenset(self.op.tags)
6470
    cur_tags = self.target.GetTags()
6471
    if not del_tags <= cur_tags:
6472
      diff_tags = del_tags - cur_tags
6473
      diff_names = ["'%s'" % tag for tag in diff_tags]
6474
      diff_names.sort()
6475
      raise errors.OpPrereqError("Tag(s) %s not found" %
6476
                                 (",".join(diff_names)))
6477

    
6478
  def Exec(self, feedback_fn):
6479
    """Remove the tag from the object.
6480

6481
    """
6482
    for tag in self.op.tags:
6483
      self.target.RemoveTag(tag)
6484
    try:
6485
      self.cfg.Update(self.target)
6486
    except errors.ConfigurationError:
6487
      raise errors.OpRetryError("There has been a modification to the"
6488
                                " config file and the operation has been"
6489
                                " aborted. Please retry.")
6490

    
6491

    
6492
class LUTestDelay(NoHooksLU):
6493
  """Sleep for a specified amount of time.
6494

6495
  This LU sleeps on the master and/or nodes for a specified amount of
6496
  time.
6497

6498
  """
6499
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6500
  REQ_BGL = False
6501

    
6502
  def ExpandNames(self):
6503
    """Expand names and set required locks.
6504

6505
    This expands the node list, if any.
6506

6507
    """
6508
    self.needed_locks = {}
6509
    if self.op.on_nodes:
6510
      # _GetWantedNodes can be used here, but is not always appropriate to use
6511
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6512
      # more information.
6513
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6514
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6515

    
6516
  def CheckPrereq(self):
6517
    """Check prerequisites.
6518

6519
    """
6520

    
6521
  def Exec(self, feedback_fn):
6522
    """Do the actual sleep.
6523

6524
    """
6525
    if self.op.on_master:
6526
      if not utils.TestDelay(self.op.duration):
6527
        raise errors.OpExecError("Error during master delay test")
6528
    if self.op.on_nodes:
6529
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6530
      if not result:
6531
        raise errors.OpExecError("Complete failure from rpc call")
6532
      for node, node_result in result.items():
6533
        node_result.Raise()
6534
        if not node_result.data:
6535
          raise errors.OpExecError("Failure during rpc call to node %s,"
6536
                                   " result: %s" % (node, node_result.data))
6537

    
6538

    
6539
class IAllocator(object):
6540
  """IAllocator framework.
6541

6542
  An IAllocator instance has three sets of attributes:
6543
    - cfg that is needed to query the cluster
6544
    - input data (all members of the _KEYS class attribute are required)
6545
    - four buffer attributes (in|out_data|text), that represent the
6546
      input (to the external script) in text and data structure format,
6547
      and the output from it, again in two formats
6548
    - the result variables from the script (success, info, nodes) for
6549
      easy usage
6550

6551
  """
6552
  _ALLO_KEYS = [
6553
    "mem_size", "disks", "disk_template",
6554
    "os", "tags", "nics", "vcpus", "hypervisor",
6555
    ]
6556
  _RELO_KEYS = [
6557
    "relocate_from",
6558
    ]
6559

    
6560
  def __init__(self, lu, mode, name, **kwargs):
6561
    self.lu = lu
6562
    # init buffer variables
6563
    self.in_text = self.out_text = self.in_data = self.out_data = None
6564
    # init all input fields so that pylint is happy
6565
    self.mode = mode
6566
    self.name = name
6567
    self.mem_size = self.disks = self.disk_template = None
6568
    self.os = self.tags = self.nics = self.vcpus = None
6569
    self.hypervisor = None
6570
    self.relocate_from = None
6571
    # computed fields
6572
    self.required_nodes = None
6573
    # init result fields
6574
    self.success = self.info = self.nodes = None
6575
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6576
      keyset = self._ALLO_KEYS
6577
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6578
      keyset = self._RELO_KEYS
6579
    else:
6580
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6581
                                   " IAllocator" % self.mode)
6582
    for key in kwargs:
6583
      if key not in keyset:
6584
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6585
                                     " IAllocator" % key)
6586
      setattr(self, key, kwargs[key])
6587
    for key in keyset:
6588
      if key not in kwargs:
6589
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6590
                                     " IAllocator" % key)
6591
    self._BuildInputData()
6592

    
6593
  def _ComputeClusterData(self):
6594
    """Compute the generic allocator input data.
6595

6596
    This is the data that is independent of the actual operation.
6597

6598
    """
6599
    cfg = self.lu.cfg
6600
    cluster_info = cfg.GetClusterInfo()
6601
    # cluster data
6602
    data = {
6603
      "version": constants.IALLOCATOR_VERSION,
6604
      "cluster_name": cfg.GetClusterName(),
6605
      "cluster_tags": list(cluster_info.GetTags()),
6606
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6607
      # we don't have job IDs
6608
      }
6609
    iinfo = cfg.GetAllInstancesInfo().values()
6610
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6611

    
6612
    # node data
6613
    node_results = {}
6614
    node_list = cfg.GetNodeList()
6615

    
6616
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6617
      hypervisor_name = self.hypervisor
6618
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6619
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6620

    
6621
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6622
                                           hypervisor_name)
6623
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6624
                       cluster_info.enabled_hypervisors)
6625
    for nname, nresult in node_data.items():
6626
      # first fill in static (config-based) values
6627
      ninfo = cfg.GetNodeInfo(nname)
6628
      pnr = {
6629
        "tags": list(ninfo.GetTags()),
6630
        "primary_ip": ninfo.primary_ip,
6631
        "secondary_ip": ninfo.secondary_ip,
6632
        "offline": ninfo.offline,
6633
        "drained": ninfo.drained,
6634
        "master_candidate": ninfo.master_candidate,
6635
        }
6636

    
6637
      if not ninfo.offline:
6638
        nresult.Raise()
6639
        if not isinstance(nresult.data, dict):
6640
          raise errors.OpExecError("Can't get data for node %s" % nname)
6641
        remote_info = nresult.data
6642
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6643
                     'vg_size', 'vg_free', 'cpu_total']:
6644
          if attr not in remote_info:
6645
            raise errors.OpExecError("Node '%s' didn't return attribute"
6646
                                     " '%s'" % (nname, attr))
6647
          try:
6648
            remote_info[attr] = int(remote_info[attr])
6649
          except ValueError, err:
6650
            raise errors.OpExecError("Node '%s' returned invalid value"
6651
                                     " for '%s': %s" % (nname, attr, err))
6652
        # compute memory used by primary instances
6653
        i_p_mem = i_p_up_mem = 0
6654
        for iinfo, beinfo in i_list:
6655
          if iinfo.primary_node == nname:
6656
            i_p_mem += beinfo[constants.BE_MEMORY]
6657
            if iinfo.name not in node_iinfo[nname].data:
6658
              i_used_mem = 0
6659
            else:
6660
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6661
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6662
            remote_info['memory_free'] -= max(0, i_mem_diff)
6663

    
6664
            if iinfo.admin_up:
6665
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6666

    
6667
        # compute memory used by instances
6668
        pnr_dyn = {
6669
          "total_memory": remote_info['memory_total'],
6670
          "reserved_memory": remote_info['memory_dom0'],
6671
          "free_memory": remote_info['memory_free'],
6672
          "total_disk": remote_info['vg_size'],
6673
          "free_disk": remote_info['vg_free'],
6674
          "total_cpus": remote_info['cpu_total'],
6675
          "i_pri_memory": i_p_mem,
6676
          "i_pri_up_memory": i_p_up_mem,
6677
          }
6678
        pnr.update(pnr_dyn)
6679

    
6680
      node_results[nname] = pnr
6681
    data["nodes"] = node_results
6682

    
6683
    # instance data
6684
    instance_data = {}
6685
    for iinfo, beinfo in i_list:
6686
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6687
                  for n in iinfo.nics]
6688
      pir = {
6689
        "tags": list(iinfo.GetTags()),
6690
        "admin_up": iinfo.admin_up,
6691
        "vcpus": beinfo[constants.BE_VCPUS],
6692
        "memory": beinfo[constants.BE_MEMORY],
6693
        "os": iinfo.os,
6694
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6695
        "nics": nic_data,
6696
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6697
        "disk_template": iinfo.disk_template,
6698
        "hypervisor": iinfo.hypervisor,
6699
        }
6700
      instance_data[iinfo.name] = pir
6701

    
6702
    data["instances"] = instance_data
6703

    
6704
    self.in_data = data
6705

    
6706
  def _AddNewInstance(self):
6707
    """Add new instance data to allocator structure.
6708

6709
    This in combination with _AllocatorGetClusterData will create the
6710
    correct structure needed as input for the allocator.
6711

6712
    The checks for the completeness of the opcode must have already been
6713
    done.
6714

6715
    """
6716
    data = self.in_data
6717

    
6718
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6719

    
6720
    if self.disk_template in constants.DTS_NET_MIRROR:
6721
      self.required_nodes = 2
6722
    else:
6723
      self.required_nodes = 1
6724
    request = {
6725
      "type": "allocate",
6726
      "name": self.name,
6727
      "disk_template": self.disk_template,
6728
      "tags": self.tags,
6729
      "os": self.os,
6730
      "vcpus": self.vcpus,
6731
      "memory": self.mem_size,
6732
      "disks": self.disks,
6733
      "disk_space_total": disk_space,
6734
      "nics": self.nics,
6735
      "required_nodes": self.required_nodes,
6736
      }
6737
    data["request"] = request
6738

    
6739
  def _AddRelocateInstance(self):
6740
    """Add relocate instance data to allocator structure.
6741

6742
    This in combination with _IAllocatorGetClusterData will create the
6743
    correct structure needed as input for the allocator.
6744

6745
    The checks for the completeness of the opcode must have already been
6746
    done.
6747

6748
    """
6749
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6750
    if instance is None:
6751
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6752
                                   " IAllocator" % self.name)
6753

    
6754
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6755
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6756

    
6757
    if len(instance.secondary_nodes) != 1:
6758
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6759

    
6760
    self.required_nodes = 1
6761
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6762
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6763

    
6764
    request = {
6765
      "type": "relocate",
6766
      "name": self.name,
6767
      "disk_space_total": disk_space,
6768
      "required_nodes": self.required_nodes,
6769
      "relocate_from": self.relocate_from,
6770
      }
6771
    self.in_data["request"] = request
6772

    
6773
  def _BuildInputData(self):
6774
    """Build input data structures.
6775

6776
    """
6777
    self._ComputeClusterData()
6778

    
6779
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6780
      self._AddNewInstance()
6781
    else:
6782
      self._AddRelocateInstance()
6783

    
6784
    self.in_text = serializer.Dump(self.in_data)
6785

    
6786
  def Run(self, name, validate=True, call_fn=None):
6787
    """Run an instance allocator and return the results.
6788

6789
    """
6790
    if call_fn is None:
6791
      call_fn = self.lu.rpc.call_iallocator_runner
6792
    data = self.in_text
6793

    
6794
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6795
    result.Raise()
6796

    
6797
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6798
      raise errors.OpExecError("Invalid result from master iallocator runner")
6799

    
6800
    rcode, stdout, stderr, fail = result.data
6801

    
6802
    if rcode == constants.IARUN_NOTFOUND:
6803
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6804
    elif rcode == constants.IARUN_FAILURE:
6805
      raise errors.OpExecError("Instance allocator call failed: %s,"
6806
                               " output: %s" % (fail, stdout+stderr))
6807
    self.out_text = stdout
6808
    if validate:
6809
      self._ValidateResult()
6810

    
6811
  def _ValidateResult(self):
6812
    """Process the allocator results.
6813

6814
    This will process and if successful save the result in
6815
    self.out_data and the other parameters.
6816

6817
    """
6818
    try:
6819
      rdict = serializer.Load(self.out_text)
6820
    except Exception, err:
6821
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6822

    
6823
    if not isinstance(rdict, dict):
6824
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6825

    
6826
    for key in "success", "info", "nodes":
6827
      if key not in rdict:
6828
        raise errors.OpExecError("Can't parse iallocator results:"
6829
                                 " missing key '%s'" % key)
6830
      setattr(self, key, rdict[key])
6831

    
6832
    if not isinstance(rdict["nodes"], list):
6833
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6834
                               " is not a list")
6835
    self.out_data = rdict
6836

    
6837

    
6838
class LUTestAllocator(NoHooksLU):
6839
  """Run allocator tests.
6840

6841
  This LU runs the allocator tests
6842

6843
  """
6844
  _OP_REQP = ["direction", "mode", "name"]
6845

    
6846
  def CheckPrereq(self):
6847
    """Check prerequisites.
6848

6849
    This checks the opcode parameters depending on the director and mode test.
6850

6851
    """
6852
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6853
      for attr in ["name", "mem_size", "disks", "disk_template",
6854
                   "os", "tags", "nics", "vcpus"]:
6855
        if not hasattr(self.op, attr):
6856
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6857
                                     attr)
6858
      iname = self.cfg.ExpandInstanceName(self.op.name)
6859
      if iname is not None:
6860
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6861
                                   iname)
6862
      if not isinstance(self.op.nics, list):
6863
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6864
      for row in self.op.nics:
6865
        if (not isinstance(row, dict) or
6866
            "mac" not in row or
6867
            "ip" not in row or
6868
            "bridge" not in row):
6869
          raise errors.OpPrereqError("Invalid contents of the"
6870
                                     " 'nics' parameter")
6871
      if not isinstance(self.op.disks, list):
6872
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6873
      for row in self.op.disks:
6874
        if (not isinstance(row, dict) or
6875
            "size" not in row or
6876
            not isinstance(row["size"], int) or
6877
            "mode" not in row or
6878
            row["mode"] not in ['r', 'w']):
6879
          raise errors.OpPrereqError("Invalid contents of the"
6880
                                     " 'disks' parameter")
6881
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6882
        self.op.hypervisor = self.cfg.GetHypervisorType()
6883
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6884
      if not hasattr(self.op, "name"):
6885
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6886
      fname = self.cfg.ExpandInstanceName(self.op.name)
6887
      if fname is None:
6888
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6889
                                   self.op.name)
6890
      self.op.name = fname
6891
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6892
    else:
6893
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6894
                                 self.op.mode)
6895

    
6896
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6897
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6898
        raise errors.OpPrereqError("Missing allocator name")
6899
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6900
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6901
                                 self.op.direction)
6902

    
6903
  def Exec(self, feedback_fn):
6904
    """Run the allocator test.
6905

6906
    """
6907
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6908
      ial = IAllocator(self,
6909
                       mode=self.op.mode,
6910
                       name=self.op.name,
6911
                       mem_size=self.op.mem_size,
6912
                       disks=self.op.disks,
6913
                       disk_template=self.op.disk_template,
6914
                       os=self.op.os,
6915
                       tags=self.op.tags,
6916
                       nics=self.op.nics,
6917
                       vcpus=self.op.vcpus,
6918
                       hypervisor=self.op.hypervisor,
6919
                       )
6920
    else:
6921
      ial = IAllocator(self,
6922
                       mode=self.op.mode,
6923
                       name=self.op.name,
6924
                       relocate_from=list(self.relocate_from),
6925
                       )
6926

    
6927
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6928
      result = ial.in_text
6929
    else:
6930
      ial.Run(self.op.allocator, validate=False)
6931
      result = ial.out_text
6932
    return result