Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ b399ce1e

History | View | Annotate | Download (239.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396
  return wanted
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the node is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _CheckNodeNotDrained(lu, node):
445
  """Ensure that a given node is not drained.
446

447
  @param lu: the LU on behalf of which we make the check
448
  @param node: the node to check
449
  @raise errors.OpPrereqError: if the node is drained
450

451
  """
452
  if lu.cfg.GetNodeInfo(node).drained:
453
    raise errors.OpPrereqError("Can't use drained node %s" % node)
454

    
455

    
456
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457
                          memory, vcpus, nics, disk_template, disks):
458
  """Builds instance related env variables for hooks
459

460
  This builds the hook environment from individual variables.
461

462
  @type name: string
463
  @param name: the name of the instance
464
  @type primary_node: string
465
  @param primary_node: the name of the instance's primary node
466
  @type secondary_nodes: list
467
  @param secondary_nodes: list of secondary nodes as strings
468
  @type os_type: string
469
  @param os_type: the name of the instance's OS
470
  @type status: boolean
471
  @param status: the should_run status of the instance
472
  @type memory: string
473
  @param memory: the memory size of the instance
474
  @type vcpus: string
475
  @param vcpus: the count of VCPUs the instance has
476
  @type nics: list
477
  @param nics: list of tuples (ip, bridge, mac) representing
478
      the NICs the instance  has
479
  @type disk_template: string
480
  @param disk_template: the distk template of the instance
481
  @type disks: list
482
  @param disks: the list of (size, mode) pairs
483
  @rtype: dict
484
  @return: the hook environment for this instance
485

486
  """
487
  if status:
488
    str_status = "up"
489
  else:
490
    str_status = "down"
491
  env = {
492
    "OP_TARGET": name,
493
    "INSTANCE_NAME": name,
494
    "INSTANCE_PRIMARY": primary_node,
495
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496
    "INSTANCE_OS_TYPE": os_type,
497
    "INSTANCE_STATUS": str_status,
498
    "INSTANCE_MEMORY": memory,
499
    "INSTANCE_VCPUS": vcpus,
500
    "INSTANCE_DISK_TEMPLATE": disk_template,
501
  }
502

    
503
  if nics:
504
    nic_count = len(nics)
505
    for idx, (ip, bridge, mac) in enumerate(nics):
506
      if ip is None:
507
        ip = ""
508
      env["INSTANCE_NIC%d_IP" % idx] = ip
509
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510
      env["INSTANCE_NIC%d_MAC" % idx] = mac
511
  else:
512
    nic_count = 0
513

    
514
  env["INSTANCE_NIC_COUNT"] = nic_count
515

    
516
  if disks:
517
    disk_count = len(disks)
518
    for idx, (size, mode) in enumerate(disks):
519
      env["INSTANCE_DISK%d_SIZE" % idx] = size
520
      env["INSTANCE_DISK%d_MODE" % idx] = mode
521
  else:
522
    disk_count = 0
523

    
524
  env["INSTANCE_DISK_COUNT"] = disk_count
525

    
526
  return env
527

    
528

    
529
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530
  """Builds instance related env variables for hooks from an object.
531

532
  @type lu: L{LogicalUnit}
533
  @param lu: the logical unit on whose behalf we execute
534
  @type instance: L{objects.Instance}
535
  @param instance: the instance for which we should build the
536
      environment
537
  @type override: dict
538
  @param override: dictionary with key/values that will override
539
      our values
540
  @rtype: dict
541
  @return: the hook environment dictionary
542

543
  """
544
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
545
  args = {
546
    'name': instance.name,
547
    'primary_node': instance.primary_node,
548
    'secondary_nodes': instance.secondary_nodes,
549
    'os_type': instance.os,
550
    'status': instance.admin_up,
551
    'memory': bep[constants.BE_MEMORY],
552
    'vcpus': bep[constants.BE_VCPUS],
553
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554
    'disk_template': instance.disk_template,
555
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
556
  }
557
  if override:
558
    args.update(override)
559
  return _BuildInstanceHookEnv(**args)
560

    
561

    
562
def _AdjustCandidatePool(lu):
563
  """Adjust the candidate pool after node operations.
564

565
  """
566
  mod_list = lu.cfg.MaintainCandidatePool()
567
  if mod_list:
568
    lu.LogInfo("Promoted nodes to master candidate role: %s",
569
               ", ".join(node.name for node in mod_list))
570
    for name in mod_list:
571
      lu.context.ReaddNode(name)
572
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573
  if mc_now > mc_max:
574
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575
               (mc_now, mc_max))
576

    
577

    
578
def _CheckInstanceBridgesExist(lu, instance):
579
  """Check that the brigdes needed by an instance exist.
580

581
  """
582
  # check bridges existance
583
  brlist = [nic.bridge for nic in instance.nics]
584
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585
  result.Raise()
586
  if not result.data:
587
    raise errors.OpPrereqError("One or more target bridges %s does not"
588
                               " exist on destination node '%s'" %
589
                               (brlist, instance.primary_node))
590

    
591

    
592
class LUDestroyCluster(NoHooksLU):
593
  """Logical unit for destroying the cluster.
594

595
  """
596
  _OP_REQP = []
597

    
598
  def CheckPrereq(self):
599
    """Check prerequisites.
600

601
    This checks whether the cluster is empty.
602

603
    Any errors are signalled by raising errors.OpPrereqError.
604

605
    """
606
    master = self.cfg.GetMasterNode()
607

    
608
    nodelist = self.cfg.GetNodeList()
609
    if len(nodelist) != 1 or nodelist[0] != master:
610
      raise errors.OpPrereqError("There are still %d node(s) in"
611
                                 " this cluster." % (len(nodelist) - 1))
612
    instancelist = self.cfg.GetInstanceList()
613
    if instancelist:
614
      raise errors.OpPrereqError("There are still %d instance(s) in"
615
                                 " this cluster." % len(instancelist))
616

    
617
  def Exec(self, feedback_fn):
618
    """Destroys the cluster.
619

620
    """
621
    master = self.cfg.GetMasterNode()
622
    result = self.rpc.call_node_stop_master(master, False)
623
    result.Raise()
624
    if not result.data:
625
      raise errors.OpExecError("Could not disable the master role")
626
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627
    utils.CreateBackup(priv_key)
628
    utils.CreateBackup(pub_key)
629
    return master
630

    
631

    
632
class LUVerifyCluster(LogicalUnit):
633
  """Verifies the cluster status.
634

635
  """
636
  HPATH = "cluster-verify"
637
  HTYPE = constants.HTYPE_CLUSTER
638
  _OP_REQP = ["skip_checks"]
639
  REQ_BGL = False
640

    
641
  def ExpandNames(self):
642
    self.needed_locks = {
643
      locking.LEVEL_NODE: locking.ALL_SET,
644
      locking.LEVEL_INSTANCE: locking.ALL_SET,
645
    }
646
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647

    
648
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649
                  node_result, feedback_fn, master_files,
650
                  drbd_map):
651
    """Run multiple tests against a node.
652

653
    Test list:
654

655
      - compares ganeti version
656
      - checks vg existance and size > 20G
657
      - checks config file checksum
658
      - checks ssh to other nodes
659

660
    @type nodeinfo: L{objects.Node}
661
    @param nodeinfo: the node to check
662
    @param file_list: required list of files
663
    @param local_cksum: dictionary of local files and their checksums
664
    @param node_result: the results from the node
665
    @param feedback_fn: function used to accumulate results
666
    @param master_files: list of files that only masters should have
667
    @param drbd_map: the useddrbd minors for this node, in
668
        form of minor: (instance, must_exist) which correspond to instances
669
        and their running status
670

671
    """
672
    node = nodeinfo.name
673

    
674
    # main result, node_result should be a non-empty dict
675
    if not node_result or not isinstance(node_result, dict):
676
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677
      return True
678

    
679
    # compares ganeti version
680
    local_version = constants.PROTOCOL_VERSION
681
    remote_version = node_result.get('version', None)
682
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
683
            len(remote_version) == 2):
684
      feedback_fn("  - ERROR: connection to %s failed" % (node))
685
      return True
686

    
687
    if local_version != remote_version[0]:
688
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689
                  " node %s %s" % (local_version, node, remote_version[0]))
690
      return True
691

    
692
    # node seems compatible, we can actually try to look into its results
693

    
694
    bad = False
695

    
696
    # full package version
697
    if constants.RELEASE_VERSION != remote_version[1]:
698
      feedback_fn("  - WARNING: software version mismatch: master %s,"
699
                  " node %s %s" %
700
                  (constants.RELEASE_VERSION, node, remote_version[1]))
701

    
702
    # checks vg existence and size > 20G
703

    
704
    vglist = node_result.get(constants.NV_VGLIST, None)
705
    if not vglist:
706
      feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707
                      (node,))
708
      bad = True
709
    else:
710
      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
711
                                            constants.MIN_VG_SIZE)
712
      if vgstatus:
713
        feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714
        bad = True
715

    
716
    # checks config file checksum
717

    
718
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
719
    if not isinstance(remote_cksum, dict):
720
      bad = True
721
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
722
    else:
723
      for file_name in file_list:
724
        node_is_mc = nodeinfo.master_candidate
725
        must_have_file = file_name not in master_files
726
        if file_name not in remote_cksum:
727
          if node_is_mc or must_have_file:
728
            bad = True
729
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
730
        elif remote_cksum[file_name] != local_cksum[file_name]:
731
          if node_is_mc or must_have_file:
732
            bad = True
733
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734
          else:
735
            # not candidate and this is not a must-have file
736
            bad = True
737
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738
                        " '%s'" % file_name)
739
        else:
740
          # all good, except non-master/non-must have combination
741
          if not node_is_mc and not must_have_file:
742
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
743
                        " candidates" % file_name)
744

    
745
    # checks ssh to any
746

    
747
    if constants.NV_NODELIST not in node_result:
748
      bad = True
749
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750
    else:
751
      if node_result[constants.NV_NODELIST]:
752
        bad = True
753
        for node in node_result[constants.NV_NODELIST]:
754
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755
                          (node, node_result[constants.NV_NODELIST][node]))
756

    
757
    if constants.NV_NODENETTEST not in node_result:
758
      bad = True
759
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760
    else:
761
      if node_result[constants.NV_NODENETTEST]:
762
        bad = True
763
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764
        for node in nlist:
765
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766
                          (node, node_result[constants.NV_NODENETTEST][node]))
767

    
768
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769
    if isinstance(hyp_result, dict):
770
      for hv_name, hv_result in hyp_result.iteritems():
771
        if hv_result is not None:
772
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773
                      (hv_name, hv_result))
774

    
775
    # check used drbd list
776
    used_minors = node_result.get(constants.NV_DRBDLIST, [])
777
    if not isinstance(used_minors, (tuple, list)):
778
      feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
779
                  str(used_minors))
780
    else:
781
      for minor, (iname, must_exist) in drbd_map.items():
782
        if minor not in used_minors and must_exist:
783
          feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
784
                      (minor, iname))
785
          bad = True
786
      for minor in used_minors:
787
        if minor not in drbd_map:
788
          feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
789
          bad = True
790

    
791
    return bad
792

    
793
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
794
                      node_instance, feedback_fn, n_offline):
795
    """Verify an instance.
796

797
    This function checks to see if the required block devices are
798
    available on the instance's node.
799

800
    """
801
    bad = False
802

    
803
    node_current = instanceconfig.primary_node
804

    
805
    node_vol_should = {}
806
    instanceconfig.MapLVsByNode(node_vol_should)
807

    
808
    for node in node_vol_should:
809
      if node in n_offline:
810
        # ignore missing volumes on offline nodes
811
        continue
812
      for volume in node_vol_should[node]:
813
        if node not in node_vol_is or volume not in node_vol_is[node]:
814
          feedback_fn("  - ERROR: volume %s missing on node %s" %
815
                          (volume, node))
816
          bad = True
817

    
818
    if instanceconfig.admin_up:
819
      if ((node_current not in node_instance or
820
          not instance in node_instance[node_current]) and
821
          node_current not in n_offline):
822
        feedback_fn("  - ERROR: instance %s not running on node %s" %
823
                        (instance, node_current))
824
        bad = True
825

    
826
    for node in node_instance:
827
      if (not node == node_current):
828
        if instance in node_instance[node]:
829
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
830
                          (instance, node))
831
          bad = True
832

    
833
    return bad
834

    
835
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
836
    """Verify if there are any unknown volumes in the cluster.
837

838
    The .os, .swap and backup volumes are ignored. All other volumes are
839
    reported as unknown.
840

841
    """
842
    bad = False
843

    
844
    for node in node_vol_is:
845
      for volume in node_vol_is[node]:
846
        if node not in node_vol_should or volume not in node_vol_should[node]:
847
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
848
                      (volume, node))
849
          bad = True
850
    return bad
851

    
852
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
853
    """Verify the list of running instances.
854

855
    This checks what instances are running but unknown to the cluster.
856

857
    """
858
    bad = False
859
    for node in node_instance:
860
      for runninginstance in node_instance[node]:
861
        if runninginstance not in instancelist:
862
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
863
                          (runninginstance, node))
864
          bad = True
865
    return bad
866

    
867
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
868
    """Verify N+1 Memory Resilience.
869

870
    Check that if one single node dies we can still start all the instances it
871
    was primary for.
872

873
    """
874
    bad = False
875

    
876
    for node, nodeinfo in node_info.iteritems():
877
      # This code checks that every node which is now listed as secondary has
878
      # enough memory to host all instances it is supposed to should a single
879
      # other node in the cluster fail.
880
      # FIXME: not ready for failover to an arbitrary node
881
      # FIXME: does not support file-backed instances
882
      # WARNING: we currently take into account down instances as well as up
883
      # ones, considering that even if they're down someone might want to start
884
      # them even in the event of a node failure.
885
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
886
        needed_mem = 0
887
        for instance in instances:
888
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
889
          if bep[constants.BE_AUTO_BALANCE]:
890
            needed_mem += bep[constants.BE_MEMORY]
891
        if nodeinfo['mfree'] < needed_mem:
892
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
893
                      " failovers should node %s fail" % (node, prinode))
894
          bad = True
895
    return bad
896

    
897
  def CheckPrereq(self):
898
    """Check prerequisites.
899

900
    Transform the list of checks we're going to skip into a set and check that
901
    all its members are valid.
902

903
    """
904
    self.skip_set = frozenset(self.op.skip_checks)
905
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
906
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
907

    
908
  def BuildHooksEnv(self):
909
    """Build hooks env.
910

911
    Cluster-Verify hooks just rone in the post phase and their failure makes
912
    the output be logged in the verify output and the verification to fail.
913

914
    """
915
    all_nodes = self.cfg.GetNodeList()
916
    # TODO: populate the environment with useful information for verify hooks
917
    env = {}
918
    return env, [], all_nodes
919

    
920
  def Exec(self, feedback_fn):
921
    """Verify integrity of cluster, performing various test on nodes.
922

923
    """
924
    bad = False
925
    feedback_fn("* Verifying global settings")
926
    for msg in self.cfg.VerifyConfig():
927
      feedback_fn("  - ERROR: %s" % msg)
928

    
929
    vg_name = self.cfg.GetVGName()
930
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
931
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
932
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
933
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
934
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
935
                        for iname in instancelist)
936
    i_non_redundant = [] # Non redundant instances
937
    i_non_a_balanced = [] # Non auto-balanced instances
938
    n_offline = [] # List of offline nodes
939
    n_drained = [] # List of nodes being drained
940
    node_volume = {}
941
    node_instance = {}
942
    node_info = {}
943
    instance_cfg = {}
944

    
945
    # FIXME: verify OS list
946
    # do local checksums
947
    master_files = [constants.CLUSTER_CONF_FILE]
948

    
949
    file_names = ssconf.SimpleStore().GetFileList()
950
    file_names.append(constants.SSL_CERT_FILE)
951
    file_names.append(constants.RAPI_CERT_FILE)
952
    file_names.extend(master_files)
953

    
954
    local_checksums = utils.FingerprintFiles(file_names)
955

    
956
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
957
    node_verify_param = {
958
      constants.NV_FILELIST: file_names,
959
      constants.NV_NODELIST: [node.name for node in nodeinfo
960
                              if not node.offline],
961
      constants.NV_HYPERVISOR: hypervisors,
962
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
963
                                  node.secondary_ip) for node in nodeinfo
964
                                 if not node.offline],
965
      constants.NV_LVLIST: vg_name,
966
      constants.NV_INSTANCELIST: hypervisors,
967
      constants.NV_VGLIST: None,
968
      constants.NV_VERSION: None,
969
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
970
      constants.NV_DRBDLIST: None,
971
      }
972
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
973
                                           self.cfg.GetClusterName())
974

    
975
    cluster = self.cfg.GetClusterInfo()
976
    master_node = self.cfg.GetMasterNode()
977
    all_drbd_map = self.cfg.ComputeDRBDMap()
978

    
979
    for node_i in nodeinfo:
980
      node = node_i.name
981
      nresult = all_nvinfo[node].data
982

    
983
      if node_i.offline:
984
        feedback_fn("* Skipping offline node %s" % (node,))
985
        n_offline.append(node)
986
        continue
987

    
988
      if node == master_node:
989
        ntype = "master"
990
      elif node_i.master_candidate:
991
        ntype = "master candidate"
992
      elif node_i.drained:
993
        ntype = "drained"
994
        n_drained.append(node)
995
      else:
996
        ntype = "regular"
997
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
998

    
999
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1000
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1001
        bad = True
1002
        continue
1003

    
1004
      node_drbd = {}
1005
      for minor, instance in all_drbd_map[node].items():
1006
        instance = instanceinfo[instance]
1007
        node_drbd[minor] = (instance.name, instance.admin_up)
1008
      result = self._VerifyNode(node_i, file_names, local_checksums,
1009
                                nresult, feedback_fn, master_files,
1010
                                node_drbd)
1011
      bad = bad or result
1012

    
1013
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1014
      if isinstance(lvdata, basestring):
1015
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1016
                    (node, utils.SafeEncode(lvdata)))
1017
        bad = True
1018
        node_volume[node] = {}
1019
      elif not isinstance(lvdata, dict):
1020
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1021
        bad = True
1022
        continue
1023
      else:
1024
        node_volume[node] = lvdata
1025

    
1026
      # node_instance
1027
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1028
      if not isinstance(idata, list):
1029
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1030
                    (node,))
1031
        bad = True
1032
        continue
1033

    
1034
      node_instance[node] = idata
1035

    
1036
      # node_info
1037
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1038
      if not isinstance(nodeinfo, dict):
1039
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1040
        bad = True
1041
        continue
1042

    
1043
      try:
1044
        node_info[node] = {
1045
          "mfree": int(nodeinfo['memory_free']),
1046
          "dfree": int(nresult[constants.NV_VGLIST][vg_name]),
1047
          "pinst": [],
1048
          "sinst": [],
1049
          # dictionary holding all instances this node is secondary for,
1050
          # grouped by their primary node. Each key is a cluster node, and each
1051
          # value is a list of instances which have the key as primary and the
1052
          # current node as secondary.  this is handy to calculate N+1 memory
1053
          # availability if you can only failover from a primary to its
1054
          # secondary.
1055
          "sinst-by-pnode": {},
1056
        }
1057
      except ValueError:
1058
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1059
        bad = True
1060
        continue
1061

    
1062
    node_vol_should = {}
1063

    
1064
    for instance in instancelist:
1065
      feedback_fn("* Verifying instance %s" % instance)
1066
      inst_config = instanceinfo[instance]
1067
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1068
                                     node_instance, feedback_fn, n_offline)
1069
      bad = bad or result
1070
      inst_nodes_offline = []
1071

    
1072
      inst_config.MapLVsByNode(node_vol_should)
1073

    
1074
      instance_cfg[instance] = inst_config
1075

    
1076
      pnode = inst_config.primary_node
1077
      if pnode in node_info:
1078
        node_info[pnode]['pinst'].append(instance)
1079
      elif pnode not in n_offline:
1080
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1081
                    " %s failed" % (instance, pnode))
1082
        bad = True
1083

    
1084
      if pnode in n_offline:
1085
        inst_nodes_offline.append(pnode)
1086

    
1087
      # If the instance is non-redundant we cannot survive losing its primary
1088
      # node, so we are not N+1 compliant. On the other hand we have no disk
1089
      # templates with more than one secondary so that situation is not well
1090
      # supported either.
1091
      # FIXME: does not support file-backed instances
1092
      if len(inst_config.secondary_nodes) == 0:
1093
        i_non_redundant.append(instance)
1094
      elif len(inst_config.secondary_nodes) > 1:
1095
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1096
                    % instance)
1097

    
1098
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1099
        i_non_a_balanced.append(instance)
1100

    
1101
      for snode in inst_config.secondary_nodes:
1102
        if snode in node_info:
1103
          node_info[snode]['sinst'].append(instance)
1104
          if pnode not in node_info[snode]['sinst-by-pnode']:
1105
            node_info[snode]['sinst-by-pnode'][pnode] = []
1106
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1107
        elif snode not in n_offline:
1108
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1109
                      " %s failed" % (instance, snode))
1110
          bad = True
1111
        if snode in n_offline:
1112
          inst_nodes_offline.append(snode)
1113

    
1114
      if inst_nodes_offline:
1115
        # warn that the instance lives on offline nodes, and set bad=True
1116
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1117
                    ", ".join(inst_nodes_offline))
1118
        bad = True
1119

    
1120
    feedback_fn("* Verifying orphan volumes")
1121
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1122
                                       feedback_fn)
1123
    bad = bad or result
1124

    
1125
    feedback_fn("* Verifying remaining instances")
1126
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1127
                                         feedback_fn)
1128
    bad = bad or result
1129

    
1130
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1131
      feedback_fn("* Verifying N+1 Memory redundancy")
1132
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1133
      bad = bad or result
1134

    
1135
    feedback_fn("* Other Notes")
1136
    if i_non_redundant:
1137
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1138
                  % len(i_non_redundant))
1139

    
1140
    if i_non_a_balanced:
1141
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1142
                  % len(i_non_a_balanced))
1143

    
1144
    if n_offline:
1145
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1146

    
1147
    if n_drained:
1148
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1149

    
1150
    return not bad
1151

    
1152
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1153
    """Analize the post-hooks' result
1154

1155
    This method analyses the hook result, handles it, and sends some
1156
    nicely-formatted feedback back to the user.
1157

1158
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1159
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1160
    @param hooks_results: the results of the multi-node hooks rpc call
1161
    @param feedback_fn: function used send feedback back to the caller
1162
    @param lu_result: previous Exec result
1163
    @return: the new Exec result, based on the previous result
1164
        and hook results
1165

1166
    """
1167
    # We only really run POST phase hooks, and are only interested in
1168
    # their results
1169
    if phase == constants.HOOKS_PHASE_POST:
1170
      # Used to change hooks' output to proper indentation
1171
      indent_re = re.compile('^', re.M)
1172
      feedback_fn("* Hooks Results")
1173
      if not hooks_results:
1174
        feedback_fn("  - ERROR: general communication failure")
1175
        lu_result = 1
1176
      else:
1177
        for node_name in hooks_results:
1178
          show_node_header = True
1179
          res = hooks_results[node_name]
1180
          if res.failed or res.data is False or not isinstance(res.data, list):
1181
            if res.offline:
1182
              # no need to warn or set fail return value
1183
              continue
1184
            feedback_fn("    Communication failure in hooks execution")
1185
            lu_result = 1
1186
            continue
1187
          for script, hkr, output in res.data:
1188
            if hkr == constants.HKR_FAIL:
1189
              # The node header is only shown once, if there are
1190
              # failing hooks on that node
1191
              if show_node_header:
1192
                feedback_fn("  Node %s:" % node_name)
1193
                show_node_header = False
1194
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1195
              output = indent_re.sub('      ', output)
1196
              feedback_fn("%s" % output)
1197
              lu_result = 1
1198

    
1199
      return lu_result
1200

    
1201

    
1202
class LUVerifyDisks(NoHooksLU):
1203
  """Verifies the cluster disks status.
1204

1205
  """
1206
  _OP_REQP = []
1207
  REQ_BGL = False
1208

    
1209
  def ExpandNames(self):
1210
    self.needed_locks = {
1211
      locking.LEVEL_NODE: locking.ALL_SET,
1212
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1213
    }
1214
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1215

    
1216
  def CheckPrereq(self):
1217
    """Check prerequisites.
1218

1219
    This has no prerequisites.
1220

1221
    """
1222
    pass
1223

    
1224
  def Exec(self, feedback_fn):
1225
    """Verify integrity of cluster disks.
1226

1227
    """
1228
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1229

    
1230
    vg_name = self.cfg.GetVGName()
1231
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1232
    instances = [self.cfg.GetInstanceInfo(name)
1233
                 for name in self.cfg.GetInstanceList()]
1234

    
1235
    nv_dict = {}
1236
    for inst in instances:
1237
      inst_lvs = {}
1238
      if (not inst.admin_up or
1239
          inst.disk_template not in constants.DTS_NET_MIRROR):
1240
        continue
1241
      inst.MapLVsByNode(inst_lvs)
1242
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1243
      for node, vol_list in inst_lvs.iteritems():
1244
        for vol in vol_list:
1245
          nv_dict[(node, vol)] = inst
1246

    
1247
    if not nv_dict:
1248
      return result
1249

    
1250
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1251

    
1252
    to_act = set()
1253
    for node in nodes:
1254
      # node_volume
1255
      lvs = node_lvs[node]
1256
      if lvs.failed:
1257
        if not lvs.offline:
1258
          self.LogWarning("Connection to node %s failed: %s" %
1259
                          (node, lvs.data))
1260
        continue
1261
      lvs = lvs.data
1262
      if isinstance(lvs, basestring):
1263
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1264
        res_nlvm[node] = lvs
1265
      elif not isinstance(lvs, dict):
1266
        logging.warning("Connection to node %s failed or invalid data"
1267
                        " returned", node)
1268
        res_nodes.append(node)
1269
        continue
1270

    
1271
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1272
        inst = nv_dict.pop((node, lv_name), None)
1273
        if (not lv_online and inst is not None
1274
            and inst.name not in res_instances):
1275
          res_instances.append(inst.name)
1276

    
1277
    # any leftover items in nv_dict are missing LVs, let's arrange the
1278
    # data better
1279
    for key, inst in nv_dict.iteritems():
1280
      if inst.name not in res_missing:
1281
        res_missing[inst.name] = []
1282
      res_missing[inst.name].append(key)
1283

    
1284
    return result
1285

    
1286

    
1287
class LURenameCluster(LogicalUnit):
1288
  """Rename the cluster.
1289

1290
  """
1291
  HPATH = "cluster-rename"
1292
  HTYPE = constants.HTYPE_CLUSTER
1293
  _OP_REQP = ["name"]
1294

    
1295
  def BuildHooksEnv(self):
1296
    """Build hooks env.
1297

1298
    """
1299
    env = {
1300
      "OP_TARGET": self.cfg.GetClusterName(),
1301
      "NEW_NAME": self.op.name,
1302
      }
1303
    mn = self.cfg.GetMasterNode()
1304
    return env, [mn], [mn]
1305

    
1306
  def CheckPrereq(self):
1307
    """Verify that the passed name is a valid one.
1308

1309
    """
1310
    hostname = utils.HostInfo(self.op.name)
1311

    
1312
    new_name = hostname.name
1313
    self.ip = new_ip = hostname.ip
1314
    old_name = self.cfg.GetClusterName()
1315
    old_ip = self.cfg.GetMasterIP()
1316
    if new_name == old_name and new_ip == old_ip:
1317
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1318
                                 " cluster has changed")
1319
    if new_ip != old_ip:
1320
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1321
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1322
                                   " reachable on the network. Aborting." %
1323
                                   new_ip)
1324

    
1325
    self.op.name = new_name
1326

    
1327
  def Exec(self, feedback_fn):
1328
    """Rename the cluster.
1329

1330
    """
1331
    clustername = self.op.name
1332
    ip = self.ip
1333

    
1334
    # shutdown the master IP
1335
    master = self.cfg.GetMasterNode()
1336
    result = self.rpc.call_node_stop_master(master, False)
1337
    if result.failed or not result.data:
1338
      raise errors.OpExecError("Could not disable the master role")
1339

    
1340
    try:
1341
      cluster = self.cfg.GetClusterInfo()
1342
      cluster.cluster_name = clustername
1343
      cluster.master_ip = ip
1344
      self.cfg.Update(cluster)
1345

    
1346
      # update the known hosts file
1347
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1348
      node_list = self.cfg.GetNodeList()
1349
      try:
1350
        node_list.remove(master)
1351
      except ValueError:
1352
        pass
1353
      result = self.rpc.call_upload_file(node_list,
1354
                                         constants.SSH_KNOWN_HOSTS_FILE)
1355
      for to_node, to_result in result.iteritems():
1356
        if to_result.failed or not to_result.data:
1357
          logging.error("Copy of file %s to node %s failed",
1358
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1359

    
1360
    finally:
1361
      result = self.rpc.call_node_start_master(master, False)
1362
      if result.failed or not result.data:
1363
        self.LogWarning("Could not re-enable the master role on"
1364
                        " the master, please restart manually.")
1365

    
1366

    
1367
def _RecursiveCheckIfLVMBased(disk):
1368
  """Check if the given disk or its children are lvm-based.
1369

1370
  @type disk: L{objects.Disk}
1371
  @param disk: the disk to check
1372
  @rtype: booleean
1373
  @return: boolean indicating whether a LD_LV dev_type was found or not
1374

1375
  """
1376
  if disk.children:
1377
    for chdisk in disk.children:
1378
      if _RecursiveCheckIfLVMBased(chdisk):
1379
        return True
1380
  return disk.dev_type == constants.LD_LV
1381

    
1382

    
1383
class LUSetClusterParams(LogicalUnit):
1384
  """Change the parameters of the cluster.
1385

1386
  """
1387
  HPATH = "cluster-modify"
1388
  HTYPE = constants.HTYPE_CLUSTER
1389
  _OP_REQP = []
1390
  REQ_BGL = False
1391

    
1392
  def CheckParameters(self):
1393
    """Check parameters
1394

1395
    """
1396
    if not hasattr(self.op, "candidate_pool_size"):
1397
      self.op.candidate_pool_size = None
1398
    if self.op.candidate_pool_size is not None:
1399
      try:
1400
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1401
      except ValueError, err:
1402
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1403
                                   str(err))
1404
      if self.op.candidate_pool_size < 1:
1405
        raise errors.OpPrereqError("At least one master candidate needed")
1406

    
1407
  def ExpandNames(self):
1408
    # FIXME: in the future maybe other cluster params won't require checking on
1409
    # all nodes to be modified.
1410
    self.needed_locks = {
1411
      locking.LEVEL_NODE: locking.ALL_SET,
1412
    }
1413
    self.share_locks[locking.LEVEL_NODE] = 1
1414

    
1415
  def BuildHooksEnv(self):
1416
    """Build hooks env.
1417

1418
    """
1419
    env = {
1420
      "OP_TARGET": self.cfg.GetClusterName(),
1421
      "NEW_VG_NAME": self.op.vg_name,
1422
      }
1423
    mn = self.cfg.GetMasterNode()
1424
    return env, [mn], [mn]
1425

    
1426
  def CheckPrereq(self):
1427
    """Check prerequisites.
1428

1429
    This checks whether the given params don't conflict and
1430
    if the given volume group is valid.
1431

1432
    """
1433
    if self.op.vg_name is not None and not self.op.vg_name:
1434
      instances = self.cfg.GetAllInstancesInfo().values()
1435
      for inst in instances:
1436
        for disk in inst.disks:
1437
          if _RecursiveCheckIfLVMBased(disk):
1438
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1439
                                       " lvm-based instances exist")
1440

    
1441
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1442

    
1443
    # if vg_name not None, checks given volume group on all nodes
1444
    if self.op.vg_name:
1445
      vglist = self.rpc.call_vg_list(node_list)
1446
      for node in node_list:
1447
        if vglist[node].failed:
1448
          # ignoring down node
1449
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1450
          continue
1451
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1452
                                              self.op.vg_name,
1453
                                              constants.MIN_VG_SIZE)
1454
        if vgstatus:
1455
          raise errors.OpPrereqError("Error on node '%s': %s" %
1456
                                     (node, vgstatus))
1457

    
1458
    self.cluster = cluster = self.cfg.GetClusterInfo()
1459
    # validate beparams changes
1460
    if self.op.beparams:
1461
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1462
      self.new_beparams = cluster.FillDict(
1463
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1464

    
1465
    # hypervisor list/parameters
1466
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1467
    if self.op.hvparams:
1468
      if not isinstance(self.op.hvparams, dict):
1469
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1470
      for hv_name, hv_dict in self.op.hvparams.items():
1471
        if hv_name not in self.new_hvparams:
1472
          self.new_hvparams[hv_name] = hv_dict
1473
        else:
1474
          self.new_hvparams[hv_name].update(hv_dict)
1475

    
1476
    if self.op.enabled_hypervisors is not None:
1477
      self.hv_list = self.op.enabled_hypervisors
1478
    else:
1479
      self.hv_list = cluster.enabled_hypervisors
1480

    
1481
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1482
      # either the enabled list has changed, or the parameters have, validate
1483
      for hv_name, hv_params in self.new_hvparams.items():
1484
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1485
            (self.op.enabled_hypervisors and
1486
             hv_name in self.op.enabled_hypervisors)):
1487
          # either this is a new hypervisor, or its parameters have changed
1488
          hv_class = hypervisor.GetHypervisor(hv_name)
1489
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1490
          hv_class.CheckParameterSyntax(hv_params)
1491
          _CheckHVParams(self, node_list, hv_name, hv_params)
1492

    
1493
  def Exec(self, feedback_fn):
1494
    """Change the parameters of the cluster.
1495

1496
    """
1497
    if self.op.vg_name is not None:
1498
      if self.op.vg_name != self.cfg.GetVGName():
1499
        self.cfg.SetVGName(self.op.vg_name)
1500
      else:
1501
        feedback_fn("Cluster LVM configuration already in desired"
1502
                    " state, not changing")
1503
    if self.op.hvparams:
1504
      self.cluster.hvparams = self.new_hvparams
1505
    if self.op.enabled_hypervisors is not None:
1506
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1507
    if self.op.beparams:
1508
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1509
    if self.op.candidate_pool_size is not None:
1510
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1511

    
1512
    self.cfg.Update(self.cluster)
1513

    
1514
    # we want to update nodes after the cluster so that if any errors
1515
    # happen, we have recorded and saved the cluster info
1516
    if self.op.candidate_pool_size is not None:
1517
      _AdjustCandidatePool(self)
1518

    
1519

    
1520
class LURedistributeConfig(NoHooksLU):
1521
  """Force the redistribution of cluster configuration.
1522

1523
  This is a very simple LU.
1524

1525
  """
1526
  _OP_REQP = []
1527
  REQ_BGL = False
1528

    
1529
  def ExpandNames(self):
1530
    self.needed_locks = {
1531
      locking.LEVEL_NODE: locking.ALL_SET,
1532
    }
1533
    self.share_locks[locking.LEVEL_NODE] = 1
1534

    
1535
  def CheckPrereq(self):
1536
    """Check prerequisites.
1537

1538
    """
1539

    
1540
  def Exec(self, feedback_fn):
1541
    """Redistribute the configuration.
1542

1543
    """
1544
    self.cfg.Update(self.cfg.GetClusterInfo())
1545

    
1546

    
1547
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1548
  """Sleep and poll for an instance's disk to sync.
1549

1550
  """
1551
  if not instance.disks:
1552
    return True
1553

    
1554
  if not oneshot:
1555
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1556

    
1557
  node = instance.primary_node
1558

    
1559
  for dev in instance.disks:
1560
    lu.cfg.SetDiskID(dev, node)
1561

    
1562
  retries = 0
1563
  while True:
1564
    max_time = 0
1565
    done = True
1566
    cumul_degraded = False
1567
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1568
    if rstats.failed or not rstats.data:
1569
      lu.LogWarning("Can't get any data from node %s", node)
1570
      retries += 1
1571
      if retries >= 10:
1572
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1573
                                 " aborting." % node)
1574
      time.sleep(6)
1575
      continue
1576
    rstats = rstats.data
1577
    retries = 0
1578
    for i, mstat in enumerate(rstats):
1579
      if mstat is None:
1580
        lu.LogWarning("Can't compute data for node %s/%s",
1581
                           node, instance.disks[i].iv_name)
1582
        continue
1583
      # we ignore the ldisk parameter
1584
      perc_done, est_time, is_degraded, _ = mstat
1585
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1586
      if perc_done is not None:
1587
        done = False
1588
        if est_time is not None:
1589
          rem_time = "%d estimated seconds remaining" % est_time
1590
          max_time = est_time
1591
        else:
1592
          rem_time = "no time estimate"
1593
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1594
                        (instance.disks[i].iv_name, perc_done, rem_time))
1595
    if done or oneshot:
1596
      break
1597

    
1598
    time.sleep(min(60, max_time))
1599

    
1600
  if done:
1601
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1602
  return not cumul_degraded
1603

    
1604

    
1605
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1606
  """Check that mirrors are not degraded.
1607

1608
  The ldisk parameter, if True, will change the test from the
1609
  is_degraded attribute (which represents overall non-ok status for
1610
  the device(s)) to the ldisk (representing the local storage status).
1611

1612
  """
1613
  lu.cfg.SetDiskID(dev, node)
1614
  if ldisk:
1615
    idx = 6
1616
  else:
1617
    idx = 5
1618

    
1619
  result = True
1620
  if on_primary or dev.AssembleOnSecondary():
1621
    rstats = lu.rpc.call_blockdev_find(node, dev)
1622
    msg = rstats.RemoteFailMsg()
1623
    if msg:
1624
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1625
      result = False
1626
    elif not rstats.payload:
1627
      lu.LogWarning("Can't find disk on node %s", node)
1628
      result = False
1629
    else:
1630
      result = result and (not rstats.payload[idx])
1631
  if dev.children:
1632
    for child in dev.children:
1633
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1634

    
1635
  return result
1636

    
1637

    
1638
class LUDiagnoseOS(NoHooksLU):
1639
  """Logical unit for OS diagnose/query.
1640

1641
  """
1642
  _OP_REQP = ["output_fields", "names"]
1643
  REQ_BGL = False
1644
  _FIELDS_STATIC = utils.FieldSet()
1645
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1646

    
1647
  def ExpandNames(self):
1648
    if self.op.names:
1649
      raise errors.OpPrereqError("Selective OS query not supported")
1650

    
1651
    _CheckOutputFields(static=self._FIELDS_STATIC,
1652
                       dynamic=self._FIELDS_DYNAMIC,
1653
                       selected=self.op.output_fields)
1654

    
1655
    # Lock all nodes, in shared mode
1656
    self.needed_locks = {}
1657
    self.share_locks[locking.LEVEL_NODE] = 1
1658
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1659

    
1660
  def CheckPrereq(self):
1661
    """Check prerequisites.
1662

1663
    """
1664

    
1665
  @staticmethod
1666
  def _DiagnoseByOS(node_list, rlist):
1667
    """Remaps a per-node return list into an a per-os per-node dictionary
1668

1669
    @param node_list: a list with the names of all nodes
1670
    @param rlist: a map with node names as keys and OS objects as values
1671

1672
    @rtype: dict
1673
    @returns: a dictionary with osnames as keys and as value another map, with
1674
        nodes as keys and list of OS objects as values, eg::
1675

1676
          {"debian-etch": {"node1": [<object>,...],
1677
                           "node2": [<object>,]}
1678
          }
1679

1680
    """
1681
    all_os = {}
1682
    for node_name, nr in rlist.iteritems():
1683
      if nr.failed or not nr.data:
1684
        continue
1685
      for os_obj in nr.data:
1686
        if os_obj.name not in all_os:
1687
          # build a list of nodes for this os containing empty lists
1688
          # for each node in node_list
1689
          all_os[os_obj.name] = {}
1690
          for nname in node_list:
1691
            all_os[os_obj.name][nname] = []
1692
        all_os[os_obj.name][node_name].append(os_obj)
1693
    return all_os
1694

    
1695
  def Exec(self, feedback_fn):
1696
    """Compute the list of OSes.
1697

1698
    """
1699
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1700
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1701
                   if node in node_list]
1702
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1703
    if node_data == False:
1704
      raise errors.OpExecError("Can't gather the list of OSes")
1705
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1706
    output = []
1707
    for os_name, os_data in pol.iteritems():
1708
      row = []
1709
      for field in self.op.output_fields:
1710
        if field == "name":
1711
          val = os_name
1712
        elif field == "valid":
1713
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1714
        elif field == "node_status":
1715
          val = {}
1716
          for node_name, nos_list in os_data.iteritems():
1717
            val[node_name] = [(v.status, v.path) for v in nos_list]
1718
        else:
1719
          raise errors.ParameterError(field)
1720
        row.append(val)
1721
      output.append(row)
1722

    
1723
    return output
1724

    
1725

    
1726
class LURemoveNode(LogicalUnit):
1727
  """Logical unit for removing a node.
1728

1729
  """
1730
  HPATH = "node-remove"
1731
  HTYPE = constants.HTYPE_NODE
1732
  _OP_REQP = ["node_name"]
1733

    
1734
  def BuildHooksEnv(self):
1735
    """Build hooks env.
1736

1737
    This doesn't run on the target node in the pre phase as a failed
1738
    node would then be impossible to remove.
1739

1740
    """
1741
    env = {
1742
      "OP_TARGET": self.op.node_name,
1743
      "NODE_NAME": self.op.node_name,
1744
      }
1745
    all_nodes = self.cfg.GetNodeList()
1746
    all_nodes.remove(self.op.node_name)
1747
    return env, all_nodes, all_nodes
1748

    
1749
  def CheckPrereq(self):
1750
    """Check prerequisites.
1751

1752
    This checks:
1753
     - the node exists in the configuration
1754
     - it does not have primary or secondary instances
1755
     - it's not the master
1756

1757
    Any errors are signalled by raising errors.OpPrereqError.
1758

1759
    """
1760
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1761
    if node is None:
1762
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1763

    
1764
    instance_list = self.cfg.GetInstanceList()
1765

    
1766
    masternode = self.cfg.GetMasterNode()
1767
    if node.name == masternode:
1768
      raise errors.OpPrereqError("Node is the master node,"
1769
                                 " you need to failover first.")
1770

    
1771
    for instance_name in instance_list:
1772
      instance = self.cfg.GetInstanceInfo(instance_name)
1773
      if node.name in instance.all_nodes:
1774
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1775
                                   " please remove first." % instance_name)
1776
    self.op.node_name = node.name
1777
    self.node = node
1778

    
1779
  def Exec(self, feedback_fn):
1780
    """Removes the node from the cluster.
1781

1782
    """
1783
    node = self.node
1784
    logging.info("Stopping the node daemon and removing configs from node %s",
1785
                 node.name)
1786

    
1787
    self.context.RemoveNode(node.name)
1788

    
1789
    self.rpc.call_node_leave_cluster(node.name)
1790

    
1791
    # Promote nodes to master candidate as needed
1792
    _AdjustCandidatePool(self)
1793

    
1794

    
1795
class LUQueryNodes(NoHooksLU):
1796
  """Logical unit for querying nodes.
1797

1798
  """
1799
  _OP_REQP = ["output_fields", "names", "use_locking"]
1800
  REQ_BGL = False
1801
  _FIELDS_DYNAMIC = utils.FieldSet(
1802
    "dtotal", "dfree",
1803
    "mtotal", "mnode", "mfree",
1804
    "bootid",
1805
    "ctotal", "cnodes", "csockets",
1806
    )
1807

    
1808
  _FIELDS_STATIC = utils.FieldSet(
1809
    "name", "pinst_cnt", "sinst_cnt",
1810
    "pinst_list", "sinst_list",
1811
    "pip", "sip", "tags",
1812
    "serial_no",
1813
    "master_candidate",
1814
    "master",
1815
    "offline",
1816
    "drained",
1817
    )
1818

    
1819
  def ExpandNames(self):
1820
    _CheckOutputFields(static=self._FIELDS_STATIC,
1821
                       dynamic=self._FIELDS_DYNAMIC,
1822
                       selected=self.op.output_fields)
1823

    
1824
    self.needed_locks = {}
1825
    self.share_locks[locking.LEVEL_NODE] = 1
1826

    
1827
    if self.op.names:
1828
      self.wanted = _GetWantedNodes(self, self.op.names)
1829
    else:
1830
      self.wanted = locking.ALL_SET
1831

    
1832
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1833
    self.do_locking = self.do_node_query and self.op.use_locking
1834
    if self.do_locking:
1835
      # if we don't request only static fields, we need to lock the nodes
1836
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1837

    
1838

    
1839
  def CheckPrereq(self):
1840
    """Check prerequisites.
1841

1842
    """
1843
    # The validation of the node list is done in the _GetWantedNodes,
1844
    # if non empty, and if empty, there's no validation to do
1845
    pass
1846

    
1847
  def Exec(self, feedback_fn):
1848
    """Computes the list of nodes and their attributes.
1849

1850
    """
1851
    all_info = self.cfg.GetAllNodesInfo()
1852
    if self.do_locking:
1853
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1854
    elif self.wanted != locking.ALL_SET:
1855
      nodenames = self.wanted
1856
      missing = set(nodenames).difference(all_info.keys())
1857
      if missing:
1858
        raise errors.OpExecError(
1859
          "Some nodes were removed before retrieving their data: %s" % missing)
1860
    else:
1861
      nodenames = all_info.keys()
1862

    
1863
    nodenames = utils.NiceSort(nodenames)
1864
    nodelist = [all_info[name] for name in nodenames]
1865

    
1866
    # begin data gathering
1867

    
1868
    if self.do_node_query:
1869
      live_data = {}
1870
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1871
                                          self.cfg.GetHypervisorType())
1872
      for name in nodenames:
1873
        nodeinfo = node_data[name]
1874
        if not nodeinfo.failed and nodeinfo.data:
1875
          nodeinfo = nodeinfo.data
1876
          fn = utils.TryConvert
1877
          live_data[name] = {
1878
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1879
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1880
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1881
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1882
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1883
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1884
            "bootid": nodeinfo.get('bootid', None),
1885
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1886
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1887
            }
1888
        else:
1889
          live_data[name] = {}
1890
    else:
1891
      live_data = dict.fromkeys(nodenames, {})
1892

    
1893
    node_to_primary = dict([(name, set()) for name in nodenames])
1894
    node_to_secondary = dict([(name, set()) for name in nodenames])
1895

    
1896
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1897
                             "sinst_cnt", "sinst_list"))
1898
    if inst_fields & frozenset(self.op.output_fields):
1899
      instancelist = self.cfg.GetInstanceList()
1900

    
1901
      for instance_name in instancelist:
1902
        inst = self.cfg.GetInstanceInfo(instance_name)
1903
        if inst.primary_node in node_to_primary:
1904
          node_to_primary[inst.primary_node].add(inst.name)
1905
        for secnode in inst.secondary_nodes:
1906
          if secnode in node_to_secondary:
1907
            node_to_secondary[secnode].add(inst.name)
1908

    
1909
    master_node = self.cfg.GetMasterNode()
1910

    
1911
    # end data gathering
1912

    
1913
    output = []
1914
    for node in nodelist:
1915
      node_output = []
1916
      for field in self.op.output_fields:
1917
        if field == "name":
1918
          val = node.name
1919
        elif field == "pinst_list":
1920
          val = list(node_to_primary[node.name])
1921
        elif field == "sinst_list":
1922
          val = list(node_to_secondary[node.name])
1923
        elif field == "pinst_cnt":
1924
          val = len(node_to_primary[node.name])
1925
        elif field == "sinst_cnt":
1926
          val = len(node_to_secondary[node.name])
1927
        elif field == "pip":
1928
          val = node.primary_ip
1929
        elif field == "sip":
1930
          val = node.secondary_ip
1931
        elif field == "tags":
1932
          val = list(node.GetTags())
1933
        elif field == "serial_no":
1934
          val = node.serial_no
1935
        elif field == "master_candidate":
1936
          val = node.master_candidate
1937
        elif field == "master":
1938
          val = node.name == master_node
1939
        elif field == "offline":
1940
          val = node.offline
1941
        elif field == "drained":
1942
          val = node.drained
1943
        elif self._FIELDS_DYNAMIC.Matches(field):
1944
          val = live_data[node.name].get(field, None)
1945
        else:
1946
          raise errors.ParameterError(field)
1947
        node_output.append(val)
1948
      output.append(node_output)
1949

    
1950
    return output
1951

    
1952

    
1953
class LUQueryNodeVolumes(NoHooksLU):
1954
  """Logical unit for getting volumes on node(s).
1955

1956
  """
1957
  _OP_REQP = ["nodes", "output_fields"]
1958
  REQ_BGL = False
1959
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1960
  _FIELDS_STATIC = utils.FieldSet("node")
1961

    
1962
  def ExpandNames(self):
1963
    _CheckOutputFields(static=self._FIELDS_STATIC,
1964
                       dynamic=self._FIELDS_DYNAMIC,
1965
                       selected=self.op.output_fields)
1966

    
1967
    self.needed_locks = {}
1968
    self.share_locks[locking.LEVEL_NODE] = 1
1969
    if not self.op.nodes:
1970
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1971
    else:
1972
      self.needed_locks[locking.LEVEL_NODE] = \
1973
        _GetWantedNodes(self, self.op.nodes)
1974

    
1975
  def CheckPrereq(self):
1976
    """Check prerequisites.
1977

1978
    This checks that the fields required are valid output fields.
1979

1980
    """
1981
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1982

    
1983
  def Exec(self, feedback_fn):
1984
    """Computes the list of nodes and their attributes.
1985

1986
    """
1987
    nodenames = self.nodes
1988
    volumes = self.rpc.call_node_volumes(nodenames)
1989

    
1990
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1991
             in self.cfg.GetInstanceList()]
1992

    
1993
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1994

    
1995
    output = []
1996
    for node in nodenames:
1997
      if node not in volumes or volumes[node].failed or not volumes[node].data:
1998
        continue
1999

    
2000
      node_vols = volumes[node].data[:]
2001
      node_vols.sort(key=lambda vol: vol['dev'])
2002

    
2003
      for vol in node_vols:
2004
        node_output = []
2005
        for field in self.op.output_fields:
2006
          if field == "node":
2007
            val = node
2008
          elif field == "phys":
2009
            val = vol['dev']
2010
          elif field == "vg":
2011
            val = vol['vg']
2012
          elif field == "name":
2013
            val = vol['name']
2014
          elif field == "size":
2015
            val = int(float(vol['size']))
2016
          elif field == "instance":
2017
            for inst in ilist:
2018
              if node not in lv_by_node[inst]:
2019
                continue
2020
              if vol['name'] in lv_by_node[inst][node]:
2021
                val = inst.name
2022
                break
2023
            else:
2024
              val = '-'
2025
          else:
2026
            raise errors.ParameterError(field)
2027
          node_output.append(str(val))
2028

    
2029
        output.append(node_output)
2030

    
2031
    return output
2032

    
2033

    
2034
class LUAddNode(LogicalUnit):
2035
  """Logical unit for adding node to the cluster.
2036

2037
  """
2038
  HPATH = "node-add"
2039
  HTYPE = constants.HTYPE_NODE
2040
  _OP_REQP = ["node_name"]
2041

    
2042
  def BuildHooksEnv(self):
2043
    """Build hooks env.
2044

2045
    This will run on all nodes before, and on all nodes + the new node after.
2046

2047
    """
2048
    env = {
2049
      "OP_TARGET": self.op.node_name,
2050
      "NODE_NAME": self.op.node_name,
2051
      "NODE_PIP": self.op.primary_ip,
2052
      "NODE_SIP": self.op.secondary_ip,
2053
      }
2054
    nodes_0 = self.cfg.GetNodeList()
2055
    nodes_1 = nodes_0 + [self.op.node_name, ]
2056
    return env, nodes_0, nodes_1
2057

    
2058
  def CheckPrereq(self):
2059
    """Check prerequisites.
2060

2061
    This checks:
2062
     - the new node is not already in the config
2063
     - it is resolvable
2064
     - its parameters (single/dual homed) matches the cluster
2065

2066
    Any errors are signalled by raising errors.OpPrereqError.
2067

2068
    """
2069
    node_name = self.op.node_name
2070
    cfg = self.cfg
2071

    
2072
    dns_data = utils.HostInfo(node_name)
2073

    
2074
    node = dns_data.name
2075
    primary_ip = self.op.primary_ip = dns_data.ip
2076
    secondary_ip = getattr(self.op, "secondary_ip", None)
2077
    if secondary_ip is None:
2078
      secondary_ip = primary_ip
2079
    if not utils.IsValidIP(secondary_ip):
2080
      raise errors.OpPrereqError("Invalid secondary IP given")
2081
    self.op.secondary_ip = secondary_ip
2082

    
2083
    node_list = cfg.GetNodeList()
2084
    if not self.op.readd and node in node_list:
2085
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2086
                                 node)
2087
    elif self.op.readd and node not in node_list:
2088
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2089

    
2090
    for existing_node_name in node_list:
2091
      existing_node = cfg.GetNodeInfo(existing_node_name)
2092

    
2093
      if self.op.readd and node == existing_node_name:
2094
        if (existing_node.primary_ip != primary_ip or
2095
            existing_node.secondary_ip != secondary_ip):
2096
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2097
                                     " address configuration as before")
2098
        continue
2099

    
2100
      if (existing_node.primary_ip == primary_ip or
2101
          existing_node.secondary_ip == primary_ip or
2102
          existing_node.primary_ip == secondary_ip or
2103
          existing_node.secondary_ip == secondary_ip):
2104
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2105
                                   " existing node %s" % existing_node.name)
2106

    
2107
    # check that the type of the node (single versus dual homed) is the
2108
    # same as for the master
2109
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2110
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2111
    newbie_singlehomed = secondary_ip == primary_ip
2112
    if master_singlehomed != newbie_singlehomed:
2113
      if master_singlehomed:
2114
        raise errors.OpPrereqError("The master has no private ip but the"
2115
                                   " new node has one")
2116
      else:
2117
        raise errors.OpPrereqError("The master has a private ip but the"
2118
                                   " new node doesn't have one")
2119

    
2120
    # checks reachablity
2121
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2122
      raise errors.OpPrereqError("Node not reachable by ping")
2123

    
2124
    if not newbie_singlehomed:
2125
      # check reachability from my secondary ip to newbie's secondary ip
2126
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2127
                           source=myself.secondary_ip):
2128
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2129
                                   " based ping to noded port")
2130

    
2131
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2132
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2133
    master_candidate = mc_now < cp_size
2134

    
2135
    self.new_node = objects.Node(name=node,
2136
                                 primary_ip=primary_ip,
2137
                                 secondary_ip=secondary_ip,
2138
                                 master_candidate=master_candidate,
2139
                                 offline=False, drained=False)
2140

    
2141
  def Exec(self, feedback_fn):
2142
    """Adds the new node to the cluster.
2143

2144
    """
2145
    new_node = self.new_node
2146
    node = new_node.name
2147

    
2148
    # check connectivity
2149
    result = self.rpc.call_version([node])[node]
2150
    result.Raise()
2151
    if result.data:
2152
      if constants.PROTOCOL_VERSION == result.data:
2153
        logging.info("Communication to node %s fine, sw version %s match",
2154
                     node, result.data)
2155
      else:
2156
        raise errors.OpExecError("Version mismatch master version %s,"
2157
                                 " node version %s" %
2158
                                 (constants.PROTOCOL_VERSION, result.data))
2159
    else:
2160
      raise errors.OpExecError("Cannot get version from the new node")
2161

    
2162
    # setup ssh on node
2163
    logging.info("Copy ssh key to node %s", node)
2164
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2165
    keyarray = []
2166
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2167
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2168
                priv_key, pub_key]
2169

    
2170
    for i in keyfiles:
2171
      f = open(i, 'r')
2172
      try:
2173
        keyarray.append(f.read())
2174
      finally:
2175
        f.close()
2176

    
2177
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2178
                                    keyarray[2],
2179
                                    keyarray[3], keyarray[4], keyarray[5])
2180

    
2181
    msg = result.RemoteFailMsg()
2182
    if msg:
2183
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2184
                               " new node: %s" % msg)
2185

    
2186
    # Add node to our /etc/hosts, and add key to known_hosts
2187
    utils.AddHostToEtcHosts(new_node.name)
2188

    
2189
    if new_node.secondary_ip != new_node.primary_ip:
2190
      result = self.rpc.call_node_has_ip_address(new_node.name,
2191
                                                 new_node.secondary_ip)
2192
      if result.failed or not result.data:
2193
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2194
                                 " you gave (%s). Please fix and re-run this"
2195
                                 " command." % new_node.secondary_ip)
2196

    
2197
    node_verify_list = [self.cfg.GetMasterNode()]
2198
    node_verify_param = {
2199
      'nodelist': [node],
2200
      # TODO: do a node-net-test as well?
2201
    }
2202

    
2203
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2204
                                       self.cfg.GetClusterName())
2205
    for verifier in node_verify_list:
2206
      if result[verifier].failed or not result[verifier].data:
2207
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2208
                                 " for remote verification" % verifier)
2209
      if result[verifier].data['nodelist']:
2210
        for failed in result[verifier].data['nodelist']:
2211
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2212
                      (verifier, result[verifier].data['nodelist'][failed]))
2213
        raise errors.OpExecError("ssh/hostname verification failed.")
2214

    
2215
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2216
    # including the node just added
2217
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2218
    dist_nodes = self.cfg.GetNodeList()
2219
    if not self.op.readd:
2220
      dist_nodes.append(node)
2221
    if myself.name in dist_nodes:
2222
      dist_nodes.remove(myself.name)
2223

    
2224
    logging.debug("Copying hosts and known_hosts to all nodes")
2225
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2226
      result = self.rpc.call_upload_file(dist_nodes, fname)
2227
      for to_node, to_result in result.iteritems():
2228
        if to_result.failed or not to_result.data:
2229
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2230

    
2231
    to_copy = []
2232
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2233
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2234
      to_copy.append(constants.VNC_PASSWORD_FILE)
2235

    
2236
    for fname in to_copy:
2237
      result = self.rpc.call_upload_file([node], fname)
2238
      if result[node].failed or not result[node]:
2239
        logging.error("Could not copy file %s to node %s", fname, node)
2240

    
2241
    if self.op.readd:
2242
      self.context.ReaddNode(new_node)
2243
    else:
2244
      self.context.AddNode(new_node)
2245

    
2246

    
2247
class LUSetNodeParams(LogicalUnit):
2248
  """Modifies the parameters of a node.
2249

2250
  """
2251
  HPATH = "node-modify"
2252
  HTYPE = constants.HTYPE_NODE
2253
  _OP_REQP = ["node_name"]
2254
  REQ_BGL = False
2255

    
2256
  def CheckArguments(self):
2257
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2258
    if node_name is None:
2259
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2260
    self.op.node_name = node_name
2261
    _CheckBooleanOpField(self.op, 'master_candidate')
2262
    _CheckBooleanOpField(self.op, 'offline')
2263
    _CheckBooleanOpField(self.op, 'drained')
2264
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2265
    if all_mods.count(None) == 3:
2266
      raise errors.OpPrereqError("Please pass at least one modification")
2267
    if all_mods.count(True) > 1:
2268
      raise errors.OpPrereqError("Can't set the node into more than one"
2269
                                 " state at the same time")
2270

    
2271
  def ExpandNames(self):
2272
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2273

    
2274
  def BuildHooksEnv(self):
2275
    """Build hooks env.
2276

2277
    This runs on the master node.
2278

2279
    """
2280
    env = {
2281
      "OP_TARGET": self.op.node_name,
2282
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2283
      "OFFLINE": str(self.op.offline),
2284
      "DRAINED": str(self.op.drained),
2285
      }
2286
    nl = [self.cfg.GetMasterNode(),
2287
          self.op.node_name]
2288
    return env, nl, nl
2289

    
2290
  def CheckPrereq(self):
2291
    """Check prerequisites.
2292

2293
    This only checks the instance list against the existing names.
2294

2295
    """
2296
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2297

    
2298
    if ((self.op.master_candidate == False or self.op.offline == True or
2299
         self.op.drained == True) and node.master_candidate):
2300
      # we will demote the node from master_candidate
2301
      if self.op.node_name == self.cfg.GetMasterNode():
2302
        raise errors.OpPrereqError("The master node has to be a"
2303
                                   " master candidate, online and not drained")
2304
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2305
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2306
      if num_candidates <= cp_size:
2307
        msg = ("Not enough master candidates (desired"
2308
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2309
        if self.op.force:
2310
          self.LogWarning(msg)
2311
        else:
2312
          raise errors.OpPrereqError(msg)
2313

    
2314
    if (self.op.master_candidate == True and
2315
        ((node.offline and not self.op.offline == False) or
2316
         (node.drained and not self.op.drained == False))):
2317
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2318
                                 " to master_candidate")
2319

    
2320
    return
2321

    
2322
  def Exec(self, feedback_fn):
2323
    """Modifies a node.
2324

2325
    """
2326
    node = self.node
2327

    
2328
    result = []
2329
    changed_mc = False
2330

    
2331
    if self.op.offline is not None:
2332
      node.offline = self.op.offline
2333
      result.append(("offline", str(self.op.offline)))
2334
      if self.op.offline == True:
2335
        if node.master_candidate:
2336
          node.master_candidate = False
2337
          changed_mc = True
2338
          result.append(("master_candidate", "auto-demotion due to offline"))
2339
        if node.drained:
2340
          node.drained = False
2341
          result.append(("drained", "clear drained status due to offline"))
2342

    
2343
    if self.op.master_candidate is not None:
2344
      node.master_candidate = self.op.master_candidate
2345
      changed_mc = True
2346
      result.append(("master_candidate", str(self.op.master_candidate)))
2347
      if self.op.master_candidate == False:
2348
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2349
        msg = rrc.RemoteFailMsg()
2350
        if msg:
2351
          self.LogWarning("Node failed to demote itself: %s" % msg)
2352

    
2353
    if self.op.drained is not None:
2354
      node.drained = self.op.drained
2355
      result.append(("drained", str(self.op.drained)))
2356
      if self.op.drained == True:
2357
        if node.master_candidate:
2358
          node.master_candidate = False
2359
          changed_mc = True
2360
          result.append(("master_candidate", "auto-demotion due to drain"))
2361
        if node.offline:
2362
          node.offline = False
2363
          result.append(("offline", "clear offline status due to drain"))
2364

    
2365
    # this will trigger configuration file update, if needed
2366
    self.cfg.Update(node)
2367
    # this will trigger job queue propagation or cleanup
2368
    if changed_mc:
2369
      self.context.ReaddNode(node)
2370

    
2371
    return result
2372

    
2373

    
2374
class LUQueryClusterInfo(NoHooksLU):
2375
  """Query cluster configuration.
2376

2377
  """
2378
  _OP_REQP = []
2379
  REQ_BGL = False
2380

    
2381
  def ExpandNames(self):
2382
    self.needed_locks = {}
2383

    
2384
  def CheckPrereq(self):
2385
    """No prerequsites needed for this LU.
2386

2387
    """
2388
    pass
2389

    
2390
  def Exec(self, feedback_fn):
2391
    """Return cluster config.
2392

2393
    """
2394
    cluster = self.cfg.GetClusterInfo()
2395
    result = {
2396
      "software_version": constants.RELEASE_VERSION,
2397
      "protocol_version": constants.PROTOCOL_VERSION,
2398
      "config_version": constants.CONFIG_VERSION,
2399
      "os_api_version": constants.OS_API_VERSION,
2400
      "export_version": constants.EXPORT_VERSION,
2401
      "architecture": (platform.architecture()[0], platform.machine()),
2402
      "name": cluster.cluster_name,
2403
      "master": cluster.master_node,
2404
      "default_hypervisor": cluster.default_hypervisor,
2405
      "enabled_hypervisors": cluster.enabled_hypervisors,
2406
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2407
                        for hypervisor in cluster.enabled_hypervisors]),
2408
      "beparams": cluster.beparams,
2409
      "candidate_pool_size": cluster.candidate_pool_size,
2410
      }
2411

    
2412
    return result
2413

    
2414

    
2415
class LUQueryConfigValues(NoHooksLU):
2416
  """Return configuration values.
2417

2418
  """
2419
  _OP_REQP = []
2420
  REQ_BGL = False
2421
  _FIELDS_DYNAMIC = utils.FieldSet()
2422
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2423

    
2424
  def ExpandNames(self):
2425
    self.needed_locks = {}
2426

    
2427
    _CheckOutputFields(static=self._FIELDS_STATIC,
2428
                       dynamic=self._FIELDS_DYNAMIC,
2429
                       selected=self.op.output_fields)
2430

    
2431
  def CheckPrereq(self):
2432
    """No prerequisites.
2433

2434
    """
2435
    pass
2436

    
2437
  def Exec(self, feedback_fn):
2438
    """Dump a representation of the cluster config to the standard output.
2439

2440
    """
2441
    values = []
2442
    for field in self.op.output_fields:
2443
      if field == "cluster_name":
2444
        entry = self.cfg.GetClusterName()
2445
      elif field == "master_node":
2446
        entry = self.cfg.GetMasterNode()
2447
      elif field == "drain_flag":
2448
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2449
      else:
2450
        raise errors.ParameterError(field)
2451
      values.append(entry)
2452
    return values
2453

    
2454

    
2455
class LUActivateInstanceDisks(NoHooksLU):
2456
  """Bring up an instance's disks.
2457

2458
  """
2459
  _OP_REQP = ["instance_name"]
2460
  REQ_BGL = False
2461

    
2462
  def ExpandNames(self):
2463
    self._ExpandAndLockInstance()
2464
    self.needed_locks[locking.LEVEL_NODE] = []
2465
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2466

    
2467
  def DeclareLocks(self, level):
2468
    if level == locking.LEVEL_NODE:
2469
      self._LockInstancesNodes()
2470

    
2471
  def CheckPrereq(self):
2472
    """Check prerequisites.
2473

2474
    This checks that the instance is in the cluster.
2475

2476
    """
2477
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2478
    assert self.instance is not None, \
2479
      "Cannot retrieve locked instance %s" % self.op.instance_name
2480
    _CheckNodeOnline(self, self.instance.primary_node)
2481

    
2482
  def Exec(self, feedback_fn):
2483
    """Activate the disks.
2484

2485
    """
2486
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2487
    if not disks_ok:
2488
      raise errors.OpExecError("Cannot activate block devices")
2489

    
2490
    return disks_info
2491

    
2492

    
2493
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2494
  """Prepare the block devices for an instance.
2495

2496
  This sets up the block devices on all nodes.
2497

2498
  @type lu: L{LogicalUnit}
2499
  @param lu: the logical unit on whose behalf we execute
2500
  @type instance: L{objects.Instance}
2501
  @param instance: the instance for whose disks we assemble
2502
  @type ignore_secondaries: boolean
2503
  @param ignore_secondaries: if true, errors on secondary nodes
2504
      won't result in an error return from the function
2505
  @return: False if the operation failed, otherwise a list of
2506
      (host, instance_visible_name, node_visible_name)
2507
      with the mapping from node devices to instance devices
2508

2509
  """
2510
  device_info = []
2511
  disks_ok = True
2512
  iname = instance.name
2513
  # With the two passes mechanism we try to reduce the window of
2514
  # opportunity for the race condition of switching DRBD to primary
2515
  # before handshaking occured, but we do not eliminate it
2516

    
2517
  # The proper fix would be to wait (with some limits) until the
2518
  # connection has been made and drbd transitions from WFConnection
2519
  # into any other network-connected state (Connected, SyncTarget,
2520
  # SyncSource, etc.)
2521

    
2522
  # 1st pass, assemble on all nodes in secondary mode
2523
  for inst_disk in instance.disks:
2524
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2525
      lu.cfg.SetDiskID(node_disk, node)
2526
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2527
      msg = result.RemoteFailMsg()
2528
      if msg:
2529
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2530
                           " (is_primary=False, pass=1): %s",
2531
                           inst_disk.iv_name, node, msg)
2532
        if not ignore_secondaries:
2533
          disks_ok = False
2534

    
2535
  # FIXME: race condition on drbd migration to primary
2536

    
2537
  # 2nd pass, do only the primary node
2538
  for inst_disk in instance.disks:
2539
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2540
      if node != instance.primary_node:
2541
        continue
2542
      lu.cfg.SetDiskID(node_disk, node)
2543
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2544
      msg = result.RemoteFailMsg()
2545
      if msg:
2546
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2547
                           " (is_primary=True, pass=2): %s",
2548
                           inst_disk.iv_name, node, msg)
2549
        disks_ok = False
2550
    device_info.append((instance.primary_node, inst_disk.iv_name,
2551
                        result.payload))
2552

    
2553
  # leave the disks configured for the primary node
2554
  # this is a workaround that would be fixed better by
2555
  # improving the logical/physical id handling
2556
  for disk in instance.disks:
2557
    lu.cfg.SetDiskID(disk, instance.primary_node)
2558

    
2559
  return disks_ok, device_info
2560

    
2561

    
2562
def _StartInstanceDisks(lu, instance, force):
2563
  """Start the disks of an instance.
2564

2565
  """
2566
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2567
                                           ignore_secondaries=force)
2568
  if not disks_ok:
2569
    _ShutdownInstanceDisks(lu, instance)
2570
    if force is not None and not force:
2571
      lu.proc.LogWarning("", hint="If the message above refers to a"
2572
                         " secondary node,"
2573
                         " you can retry the operation using '--force'.")
2574
    raise errors.OpExecError("Disk consistency error")
2575

    
2576

    
2577
class LUDeactivateInstanceDisks(NoHooksLU):
2578
  """Shutdown an instance's disks.
2579

2580
  """
2581
  _OP_REQP = ["instance_name"]
2582
  REQ_BGL = False
2583

    
2584
  def ExpandNames(self):
2585
    self._ExpandAndLockInstance()
2586
    self.needed_locks[locking.LEVEL_NODE] = []
2587
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2588

    
2589
  def DeclareLocks(self, level):
2590
    if level == locking.LEVEL_NODE:
2591
      self._LockInstancesNodes()
2592

    
2593
  def CheckPrereq(self):
2594
    """Check prerequisites.
2595

2596
    This checks that the instance is in the cluster.
2597

2598
    """
2599
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2600
    assert self.instance is not None, \
2601
      "Cannot retrieve locked instance %s" % self.op.instance_name
2602

    
2603
  def Exec(self, feedback_fn):
2604
    """Deactivate the disks
2605

2606
    """
2607
    instance = self.instance
2608
    _SafeShutdownInstanceDisks(self, instance)
2609

    
2610

    
2611
def _SafeShutdownInstanceDisks(lu, instance):
2612
  """Shutdown block devices of an instance.
2613

2614
  This function checks if an instance is running, before calling
2615
  _ShutdownInstanceDisks.
2616

2617
  """
2618
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2619
                                      [instance.hypervisor])
2620
  ins_l = ins_l[instance.primary_node]
2621
  if ins_l.failed or not isinstance(ins_l.data, list):
2622
    raise errors.OpExecError("Can't contact node '%s'" %
2623
                             instance.primary_node)
2624

    
2625
  if instance.name in ins_l.data:
2626
    raise errors.OpExecError("Instance is running, can't shutdown"
2627
                             " block devices.")
2628

    
2629
  _ShutdownInstanceDisks(lu, instance)
2630

    
2631

    
2632
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2633
  """Shutdown block devices of an instance.
2634

2635
  This does the shutdown on all nodes of the instance.
2636

2637
  If the ignore_primary is false, errors on the primary node are
2638
  ignored.
2639

2640
  """
2641
  all_result = True
2642
  for disk in instance.disks:
2643
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2644
      lu.cfg.SetDiskID(top_disk, node)
2645
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2646
      msg = result.RemoteFailMsg()
2647
      if msg:
2648
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2649
                      disk.iv_name, node, msg)
2650
        if not ignore_primary or node != instance.primary_node:
2651
          all_result = False
2652
  return all_result
2653

    
2654

    
2655
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2656
  """Checks if a node has enough free memory.
2657

2658
  This function check if a given node has the needed amount of free
2659
  memory. In case the node has less memory or we cannot get the
2660
  information from the node, this function raise an OpPrereqError
2661
  exception.
2662

2663
  @type lu: C{LogicalUnit}
2664
  @param lu: a logical unit from which we get configuration data
2665
  @type node: C{str}
2666
  @param node: the node to check
2667
  @type reason: C{str}
2668
  @param reason: string to use in the error message
2669
  @type requested: C{int}
2670
  @param requested: the amount of memory in MiB to check for
2671
  @type hypervisor_name: C{str}
2672
  @param hypervisor_name: the hypervisor to ask for memory stats
2673
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2674
      we cannot check the node
2675

2676
  """
2677
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2678
  nodeinfo[node].Raise()
2679
  free_mem = nodeinfo[node].data.get('memory_free')
2680
  if not isinstance(free_mem, int):
2681
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2682
                             " was '%s'" % (node, free_mem))
2683
  if requested > free_mem:
2684
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2685
                             " needed %s MiB, available %s MiB" %
2686
                             (node, reason, requested, free_mem))
2687

    
2688

    
2689
class LUStartupInstance(LogicalUnit):
2690
  """Starts an instance.
2691

2692
  """
2693
  HPATH = "instance-start"
2694
  HTYPE = constants.HTYPE_INSTANCE
2695
  _OP_REQP = ["instance_name", "force"]
2696
  REQ_BGL = False
2697

    
2698
  def ExpandNames(self):
2699
    self._ExpandAndLockInstance()
2700

    
2701
  def BuildHooksEnv(self):
2702
    """Build hooks env.
2703

2704
    This runs on master, primary and secondary nodes of the instance.
2705

2706
    """
2707
    env = {
2708
      "FORCE": self.op.force,
2709
      }
2710
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2711
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2712
    return env, nl, nl
2713

    
2714
  def CheckPrereq(self):
2715
    """Check prerequisites.
2716

2717
    This checks that the instance is in the cluster.
2718

2719
    """
2720
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2721
    assert self.instance is not None, \
2722
      "Cannot retrieve locked instance %s" % self.op.instance_name
2723

    
2724
    _CheckNodeOnline(self, instance.primary_node)
2725

    
2726
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2727
    # check bridges existance
2728
    _CheckInstanceBridgesExist(self, instance)
2729

    
2730
    _CheckNodeFreeMemory(self, instance.primary_node,
2731
                         "starting instance %s" % instance.name,
2732
                         bep[constants.BE_MEMORY], instance.hypervisor)
2733

    
2734
  def Exec(self, feedback_fn):
2735
    """Start the instance.
2736

2737
    """
2738
    instance = self.instance
2739
    force = self.op.force
2740

    
2741
    self.cfg.MarkInstanceUp(instance.name)
2742

    
2743
    node_current = instance.primary_node
2744

    
2745
    _StartInstanceDisks(self, instance, force)
2746

    
2747
    result = self.rpc.call_instance_start(node_current, instance)
2748
    msg = result.RemoteFailMsg()
2749
    if msg:
2750
      _ShutdownInstanceDisks(self, instance)
2751
      raise errors.OpExecError("Could not start instance: %s" % msg)
2752

    
2753

    
2754
class LURebootInstance(LogicalUnit):
2755
  """Reboot an instance.
2756

2757
  """
2758
  HPATH = "instance-reboot"
2759
  HTYPE = constants.HTYPE_INSTANCE
2760
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2761
  REQ_BGL = False
2762

    
2763
  def ExpandNames(self):
2764
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2765
                                   constants.INSTANCE_REBOOT_HARD,
2766
                                   constants.INSTANCE_REBOOT_FULL]:
2767
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2768
                                  (constants.INSTANCE_REBOOT_SOFT,
2769
                                   constants.INSTANCE_REBOOT_HARD,
2770
                                   constants.INSTANCE_REBOOT_FULL))
2771
    self._ExpandAndLockInstance()
2772

    
2773
  def BuildHooksEnv(self):
2774
    """Build hooks env.
2775

2776
    This runs on master, primary and secondary nodes of the instance.
2777

2778
    """
2779
    env = {
2780
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2781
      "REBOOT_TYPE": self.op.reboot_type,
2782
      }
2783
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2784
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2785
    return env, nl, nl
2786

    
2787
  def CheckPrereq(self):
2788
    """Check prerequisites.
2789

2790
    This checks that the instance is in the cluster.
2791

2792
    """
2793
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2794
    assert self.instance is not None, \
2795
      "Cannot retrieve locked instance %s" % self.op.instance_name
2796

    
2797
    _CheckNodeOnline(self, instance.primary_node)
2798

    
2799
    # check bridges existance
2800
    _CheckInstanceBridgesExist(self, instance)
2801

    
2802
  def Exec(self, feedback_fn):
2803
    """Reboot the instance.
2804

2805
    """
2806
    instance = self.instance
2807
    ignore_secondaries = self.op.ignore_secondaries
2808
    reboot_type = self.op.reboot_type
2809

    
2810
    node_current = instance.primary_node
2811

    
2812
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2813
                       constants.INSTANCE_REBOOT_HARD]:
2814
      for disk in instance.disks:
2815
        self.cfg.SetDiskID(disk, node_current)
2816
      result = self.rpc.call_instance_reboot(node_current, instance,
2817
                                             reboot_type)
2818
      msg = result.RemoteFailMsg()
2819
      if msg:
2820
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2821
    else:
2822
      result = self.rpc.call_instance_shutdown(node_current, instance)
2823
      msg = result.RemoteFailMsg()
2824
      if msg:
2825
        raise errors.OpExecError("Could not shutdown instance for"
2826
                                 " full reboot: %s" % msg)
2827
      _ShutdownInstanceDisks(self, instance)
2828
      _StartInstanceDisks(self, instance, ignore_secondaries)
2829
      result = self.rpc.call_instance_start(node_current, instance)
2830
      msg = result.RemoteFailMsg()
2831
      if msg:
2832
        _ShutdownInstanceDisks(self, instance)
2833
        raise errors.OpExecError("Could not start instance for"
2834
                                 " full reboot: %s" % msg)
2835

    
2836
    self.cfg.MarkInstanceUp(instance.name)
2837

    
2838

    
2839
class LUShutdownInstance(LogicalUnit):
2840
  """Shutdown an instance.
2841

2842
  """
2843
  HPATH = "instance-stop"
2844
  HTYPE = constants.HTYPE_INSTANCE
2845
  _OP_REQP = ["instance_name"]
2846
  REQ_BGL = False
2847

    
2848
  def ExpandNames(self):
2849
    self._ExpandAndLockInstance()
2850

    
2851
  def BuildHooksEnv(self):
2852
    """Build hooks env.
2853

2854
    This runs on master, primary and secondary nodes of the instance.
2855

2856
    """
2857
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2858
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2859
    return env, nl, nl
2860

    
2861
  def CheckPrereq(self):
2862
    """Check prerequisites.
2863

2864
    This checks that the instance is in the cluster.
2865

2866
    """
2867
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2868
    assert self.instance is not None, \
2869
      "Cannot retrieve locked instance %s" % self.op.instance_name
2870
    _CheckNodeOnline(self, self.instance.primary_node)
2871

    
2872
  def Exec(self, feedback_fn):
2873
    """Shutdown the instance.
2874

2875
    """
2876
    instance = self.instance
2877
    node_current = instance.primary_node
2878
    self.cfg.MarkInstanceDown(instance.name)
2879
    result = self.rpc.call_instance_shutdown(node_current, instance)
2880
    msg = result.RemoteFailMsg()
2881
    if msg:
2882
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2883

    
2884
    _ShutdownInstanceDisks(self, instance)
2885

    
2886

    
2887
class LUReinstallInstance(LogicalUnit):
2888
  """Reinstall an instance.
2889

2890
  """
2891
  HPATH = "instance-reinstall"
2892
  HTYPE = constants.HTYPE_INSTANCE
2893
  _OP_REQP = ["instance_name"]
2894
  REQ_BGL = False
2895

    
2896
  def ExpandNames(self):
2897
    self._ExpandAndLockInstance()
2898

    
2899
  def BuildHooksEnv(self):
2900
    """Build hooks env.
2901

2902
    This runs on master, primary and secondary nodes of the instance.
2903

2904
    """
2905
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2906
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2907
    return env, nl, nl
2908

    
2909
  def CheckPrereq(self):
2910
    """Check prerequisites.
2911

2912
    This checks that the instance is in the cluster and is not running.
2913

2914
    """
2915
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2916
    assert instance is not None, \
2917
      "Cannot retrieve locked instance %s" % self.op.instance_name
2918
    _CheckNodeOnline(self, instance.primary_node)
2919

    
2920
    if instance.disk_template == constants.DT_DISKLESS:
2921
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2922
                                 self.op.instance_name)
2923
    if instance.admin_up:
2924
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2925
                                 self.op.instance_name)
2926
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2927
                                              instance.name,
2928
                                              instance.hypervisor)
2929
    if remote_info.failed or remote_info.data:
2930
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2931
                                 (self.op.instance_name,
2932
                                  instance.primary_node))
2933

    
2934
    self.op.os_type = getattr(self.op, "os_type", None)
2935
    if self.op.os_type is not None:
2936
      # OS verification
2937
      pnode = self.cfg.GetNodeInfo(
2938
        self.cfg.ExpandNodeName(instance.primary_node))
2939
      if pnode is None:
2940
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2941
                                   self.op.pnode)
2942
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2943
      result.Raise()
2944
      if not isinstance(result.data, objects.OS):
2945
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2946
                                   " primary node"  % self.op.os_type)
2947

    
2948
    self.instance = instance
2949

    
2950
  def Exec(self, feedback_fn):
2951
    """Reinstall the instance.
2952

2953
    """
2954
    inst = self.instance
2955

    
2956
    if self.op.os_type is not None:
2957
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2958
      inst.os = self.op.os_type
2959
      self.cfg.Update(inst)
2960

    
2961
    _StartInstanceDisks(self, inst, None)
2962
    try:
2963
      feedback_fn("Running the instance OS create scripts...")
2964
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2965
      msg = result.RemoteFailMsg()
2966
      if msg:
2967
        raise errors.OpExecError("Could not install OS for instance %s"
2968
                                 " on node %s: %s" %
2969
                                 (inst.name, inst.primary_node, msg))
2970
    finally:
2971
      _ShutdownInstanceDisks(self, inst)
2972

    
2973

    
2974
class LURenameInstance(LogicalUnit):
2975
  """Rename an instance.
2976

2977
  """
2978
  HPATH = "instance-rename"
2979
  HTYPE = constants.HTYPE_INSTANCE
2980
  _OP_REQP = ["instance_name", "new_name"]
2981

    
2982
  def BuildHooksEnv(self):
2983
    """Build hooks env.
2984

2985
    This runs on master, primary and secondary nodes of the instance.
2986

2987
    """
2988
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2989
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2990
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2991
    return env, nl, nl
2992

    
2993
  def CheckPrereq(self):
2994
    """Check prerequisites.
2995

2996
    This checks that the instance is in the cluster and is not running.
2997

2998
    """
2999
    instance = self.cfg.GetInstanceInfo(
3000
      self.cfg.ExpandInstanceName(self.op.instance_name))
3001
    if instance is None:
3002
      raise errors.OpPrereqError("Instance '%s' not known" %
3003
                                 self.op.instance_name)
3004
    _CheckNodeOnline(self, instance.primary_node)
3005

    
3006
    if instance.admin_up:
3007
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3008
                                 self.op.instance_name)
3009
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3010
                                              instance.name,
3011
                                              instance.hypervisor)
3012
    remote_info.Raise()
3013
    if remote_info.data:
3014
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3015
                                 (self.op.instance_name,
3016
                                  instance.primary_node))
3017
    self.instance = instance
3018

    
3019
    # new name verification
3020
    name_info = utils.HostInfo(self.op.new_name)
3021

    
3022
    self.op.new_name = new_name = name_info.name
3023
    instance_list = self.cfg.GetInstanceList()
3024
    if new_name in instance_list:
3025
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3026
                                 new_name)
3027

    
3028
    if not getattr(self.op, "ignore_ip", False):
3029
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3030
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3031
                                   (name_info.ip, new_name))
3032

    
3033

    
3034
  def Exec(self, feedback_fn):
3035
    """Reinstall the instance.
3036

3037
    """
3038
    inst = self.instance
3039
    old_name = inst.name
3040

    
3041
    if inst.disk_template == constants.DT_FILE:
3042
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3043

    
3044
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3045
    # Change the instance lock. This is definitely safe while we hold the BGL
3046
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3047
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3048

    
3049
    # re-read the instance from the configuration after rename
3050
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3051

    
3052
    if inst.disk_template == constants.DT_FILE:
3053
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3054
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3055
                                                     old_file_storage_dir,
3056
                                                     new_file_storage_dir)
3057
      result.Raise()
3058
      if not result.data:
3059
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3060
                                 " directory '%s' to '%s' (but the instance"
3061
                                 " has been renamed in Ganeti)" % (
3062
                                 inst.primary_node, old_file_storage_dir,
3063
                                 new_file_storage_dir))
3064

    
3065
      if not result.data[0]:
3066
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3067
                                 " (but the instance has been renamed in"
3068
                                 " Ganeti)" % (old_file_storage_dir,
3069
                                               new_file_storage_dir))
3070

    
3071
    _StartInstanceDisks(self, inst, None)
3072
    try:
3073
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3074
                                                 old_name)
3075
      msg = result.RemoteFailMsg()
3076
      if msg:
3077
        msg = ("Could not run OS rename script for instance %s on node %s"
3078
               " (but the instance has been renamed in Ganeti): %s" %
3079
               (inst.name, inst.primary_node, msg))
3080
        self.proc.LogWarning(msg)
3081
    finally:
3082
      _ShutdownInstanceDisks(self, inst)
3083

    
3084

    
3085
class LURemoveInstance(LogicalUnit):
3086
  """Remove an instance.
3087

3088
  """
3089
  HPATH = "instance-remove"
3090
  HTYPE = constants.HTYPE_INSTANCE
3091
  _OP_REQP = ["instance_name", "ignore_failures"]
3092
  REQ_BGL = False
3093

    
3094
  def ExpandNames(self):
3095
    self._ExpandAndLockInstance()
3096
    self.needed_locks[locking.LEVEL_NODE] = []
3097
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3098

    
3099
  def DeclareLocks(self, level):
3100
    if level == locking.LEVEL_NODE:
3101
      self._LockInstancesNodes()
3102

    
3103
  def BuildHooksEnv(self):
3104
    """Build hooks env.
3105

3106
    This runs on master, primary and secondary nodes of the instance.
3107

3108
    """
3109
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3110
    nl = [self.cfg.GetMasterNode()]
3111
    return env, nl, nl
3112

    
3113
  def CheckPrereq(self):
3114
    """Check prerequisites.
3115

3116
    This checks that the instance is in the cluster.
3117

3118
    """
3119
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3120
    assert self.instance is not None, \
3121
      "Cannot retrieve locked instance %s" % self.op.instance_name
3122

    
3123
  def Exec(self, feedback_fn):
3124
    """Remove the instance.
3125

3126
    """
3127
    instance = self.instance
3128
    logging.info("Shutting down instance %s on node %s",
3129
                 instance.name, instance.primary_node)
3130

    
3131
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3132
    msg = result.RemoteFailMsg()
3133
    if msg:
3134
      if self.op.ignore_failures:
3135
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3136
      else:
3137
        raise errors.OpExecError("Could not shutdown instance %s on"
3138
                                 " node %s: %s" %
3139
                                 (instance.name, instance.primary_node, msg))
3140

    
3141
    logging.info("Removing block devices for instance %s", instance.name)
3142

    
3143
    if not _RemoveDisks(self, instance):
3144
      if self.op.ignore_failures:
3145
        feedback_fn("Warning: can't remove instance's disks")
3146
      else:
3147
        raise errors.OpExecError("Can't remove instance's disks")
3148

    
3149
    logging.info("Removing instance %s out of cluster config", instance.name)
3150

    
3151
    self.cfg.RemoveInstance(instance.name)
3152
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3153

    
3154

    
3155
class LUQueryInstances(NoHooksLU):
3156
  """Logical unit for querying instances.
3157

3158
  """
3159
  _OP_REQP = ["output_fields", "names", "use_locking"]
3160
  REQ_BGL = False
3161
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3162
                                    "admin_state",
3163
                                    "disk_template", "ip", "mac", "bridge",
3164
                                    "sda_size", "sdb_size", "vcpus", "tags",
3165
                                    "network_port", "beparams",
3166
                                    r"(disk)\.(size)/([0-9]+)",
3167
                                    r"(disk)\.(sizes)", "disk_usage",
3168
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3169
                                    r"(nic)\.(macs|ips|bridges)",
3170
                                    r"(disk|nic)\.(count)",
3171
                                    "serial_no", "hypervisor", "hvparams",] +
3172
                                  ["hv/%s" % name
3173
                                   for name in constants.HVS_PARAMETERS] +
3174
                                  ["be/%s" % name
3175
                                   for name in constants.BES_PARAMETERS])
3176
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3177

    
3178

    
3179
  def ExpandNames(self):
3180
    _CheckOutputFields(static=self._FIELDS_STATIC,
3181
                       dynamic=self._FIELDS_DYNAMIC,
3182
                       selected=self.op.output_fields)
3183

    
3184
    self.needed_locks = {}
3185
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3186
    self.share_locks[locking.LEVEL_NODE] = 1
3187

    
3188
    if self.op.names:
3189
      self.wanted = _GetWantedInstances(self, self.op.names)
3190
    else:
3191
      self.wanted = locking.ALL_SET
3192

    
3193
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3194
    self.do_locking = self.do_node_query and self.op.use_locking
3195
    if self.do_locking:
3196
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3197
      self.needed_locks[locking.LEVEL_NODE] = []
3198
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3199

    
3200
  def DeclareLocks(self, level):
3201
    if level == locking.LEVEL_NODE and self.do_locking:
3202
      self._LockInstancesNodes()
3203

    
3204
  def CheckPrereq(self):
3205
    """Check prerequisites.
3206

3207
    """
3208
    pass
3209

    
3210
  def Exec(self, feedback_fn):
3211
    """Computes the list of nodes and their attributes.
3212

3213
    """
3214
    all_info = self.cfg.GetAllInstancesInfo()
3215
    if self.wanted == locking.ALL_SET:
3216
      # caller didn't specify instance names, so ordering is not important
3217
      if self.do_locking:
3218
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3219
      else:
3220
        instance_names = all_info.keys()
3221
      instance_names = utils.NiceSort(instance_names)
3222
    else:
3223
      # caller did specify names, so we must keep the ordering
3224
      if self.do_locking:
3225
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3226
      else:
3227
        tgt_set = all_info.keys()
3228
      missing = set(self.wanted).difference(tgt_set)
3229
      if missing:
3230
        raise errors.OpExecError("Some instances were removed before"
3231
                                 " retrieving their data: %s" % missing)
3232
      instance_names = self.wanted
3233

    
3234
    instance_list = [all_info[iname] for iname in instance_names]
3235

    
3236
    # begin data gathering
3237

    
3238
    nodes = frozenset([inst.primary_node for inst in instance_list])
3239
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3240

    
3241
    bad_nodes = []
3242
    off_nodes = []
3243
    if self.do_node_query:
3244
      live_data = {}
3245
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3246
      for name in nodes:
3247
        result = node_data[name]
3248
        if result.offline:
3249
          # offline nodes will be in both lists
3250
          off_nodes.append(name)
3251
        if result.failed:
3252
          bad_nodes.append(name)
3253
        else:
3254
          if result.data:
3255
            live_data.update(result.data)
3256
            # else no instance is alive
3257
    else:
3258
      live_data = dict([(name, {}) for name in instance_names])
3259

    
3260
    # end data gathering
3261

    
3262
    HVPREFIX = "hv/"
3263
    BEPREFIX = "be/"
3264
    output = []
3265
    for instance in instance_list:
3266
      iout = []
3267
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3268
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3269
      for field in self.op.output_fields:
3270
        st_match = self._FIELDS_STATIC.Matches(field)
3271
        if field == "name":
3272
          val = instance.name
3273
        elif field == "os":
3274
          val = instance.os
3275
        elif field == "pnode":
3276
          val = instance.primary_node
3277
        elif field == "snodes":
3278
          val = list(instance.secondary_nodes)
3279
        elif field == "admin_state":
3280
          val = instance.admin_up
3281
        elif field == "oper_state":
3282
          if instance.primary_node in bad_nodes:
3283
            val = None
3284
          else:
3285
            val = bool(live_data.get(instance.name))
3286
        elif field == "status":
3287
          if instance.primary_node in off_nodes:
3288
            val = "ERROR_nodeoffline"
3289
          elif instance.primary_node in bad_nodes:
3290
            val = "ERROR_nodedown"
3291
          else:
3292
            running = bool(live_data.get(instance.name))
3293
            if running:
3294
              if instance.admin_up:
3295
                val = "running"
3296
              else:
3297
                val = "ERROR_up"
3298
            else:
3299
              if instance.admin_up:
3300
                val = "ERROR_down"
3301
              else:
3302
                val = "ADMIN_down"
3303
        elif field == "oper_ram":
3304
          if instance.primary_node in bad_nodes:
3305
            val = None
3306
          elif instance.name in live_data:
3307
            val = live_data[instance.name].get("memory", "?")
3308
          else:
3309
            val = "-"
3310
        elif field == "disk_template":
3311
          val = instance.disk_template
3312
        elif field == "ip":
3313
          val = instance.nics[0].ip
3314
        elif field == "bridge":
3315
          val = instance.nics[0].bridge
3316
        elif field == "mac":
3317
          val = instance.nics[0].mac
3318
        elif field == "sda_size" or field == "sdb_size":
3319
          idx = ord(field[2]) - ord('a')
3320
          try:
3321
            val = instance.FindDisk(idx).size
3322
          except errors.OpPrereqError:
3323
            val = None
3324
        elif field == "disk_usage": # total disk usage per node
3325
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3326
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3327
        elif field == "tags":
3328
          val = list(instance.GetTags())
3329
        elif field == "serial_no":
3330
          val = instance.serial_no
3331
        elif field == "network_port":
3332
          val = instance.network_port
3333
        elif field == "hypervisor":
3334
          val = instance.hypervisor
3335
        elif field == "hvparams":
3336
          val = i_hv
3337
        elif (field.startswith(HVPREFIX) and
3338
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3339
          val = i_hv.get(field[len(HVPREFIX):], None)
3340
        elif field == "beparams":
3341
          val = i_be
3342
        elif (field.startswith(BEPREFIX) and
3343
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3344
          val = i_be.get(field[len(BEPREFIX):], None)
3345
        elif st_match and st_match.groups():
3346
          # matches a variable list
3347
          st_groups = st_match.groups()
3348
          if st_groups and st_groups[0] == "disk":
3349
            if st_groups[1] == "count":
3350
              val = len(instance.disks)
3351
            elif st_groups[1] == "sizes":
3352
              val = [disk.size for disk in instance.disks]
3353
            elif st_groups[1] == "size":
3354
              try:
3355
                val = instance.FindDisk(st_groups[2]).size
3356
              except errors.OpPrereqError:
3357
                val = None
3358
            else:
3359
              assert False, "Unhandled disk parameter"
3360
          elif st_groups[0] == "nic":
3361
            if st_groups[1] == "count":
3362
              val = len(instance.nics)
3363
            elif st_groups[1] == "macs":
3364
              val = [nic.mac for nic in instance.nics]
3365
            elif st_groups[1] == "ips":
3366
              val = [nic.ip for nic in instance.nics]
3367
            elif st_groups[1] == "bridges":
3368
              val = [nic.bridge for nic in instance.nics]
3369
            else:
3370
              # index-based item
3371
              nic_idx = int(st_groups[2])
3372
              if nic_idx >= len(instance.nics):
3373
                val = None
3374
              else:
3375
                if st_groups[1] == "mac":
3376
                  val = instance.nics[nic_idx].mac
3377
                elif st_groups[1] == "ip":
3378
                  val = instance.nics[nic_idx].ip
3379
                elif st_groups[1] == "bridge":
3380
                  val = instance.nics[nic_idx].bridge
3381
                else:
3382
                  assert False, "Unhandled NIC parameter"
3383
          else:
3384
            assert False, "Unhandled variable parameter"
3385
        else:
3386
          raise errors.ParameterError(field)
3387
        iout.append(val)
3388
      output.append(iout)
3389

    
3390
    return output
3391

    
3392

    
3393
class LUFailoverInstance(LogicalUnit):
3394
  """Failover an instance.
3395

3396
  """
3397
  HPATH = "instance-failover"
3398
  HTYPE = constants.HTYPE_INSTANCE
3399
  _OP_REQP = ["instance_name", "ignore_consistency"]
3400
  REQ_BGL = False
3401

    
3402
  def ExpandNames(self):
3403
    self._ExpandAndLockInstance()
3404
    self.needed_locks[locking.LEVEL_NODE] = []
3405
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3406

    
3407
  def DeclareLocks(self, level):
3408
    if level == locking.LEVEL_NODE:
3409
      self._LockInstancesNodes()
3410

    
3411
  def BuildHooksEnv(self):
3412
    """Build hooks env.
3413

3414
    This runs on master, primary and secondary nodes of the instance.
3415

3416
    """
3417
    env = {
3418
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3419
      }
3420
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3421
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3422
    return env, nl, nl
3423

    
3424
  def CheckPrereq(self):
3425
    """Check prerequisites.
3426

3427
    This checks that the instance is in the cluster.
3428

3429
    """
3430
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3431
    assert self.instance is not None, \
3432
      "Cannot retrieve locked instance %s" % self.op.instance_name
3433

    
3434
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3435
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3436
      raise errors.OpPrereqError("Instance's disk layout is not"
3437
                                 " network mirrored, cannot failover.")
3438

    
3439
    secondary_nodes = instance.secondary_nodes
3440
    if not secondary_nodes:
3441
      raise errors.ProgrammerError("no secondary node but using "
3442
                                   "a mirrored disk template")
3443

    
3444
    target_node = secondary_nodes[0]
3445
    _CheckNodeOnline(self, target_node)
3446
    _CheckNodeNotDrained(self, target_node)
3447
    # check memory requirements on the secondary node
3448
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3449
                         instance.name, bep[constants.BE_MEMORY],
3450
                         instance.hypervisor)
3451

    
3452
    # check bridge existance
3453
    brlist = [nic.bridge for nic in instance.nics]
3454
    result = self.rpc.call_bridges_exist(target_node, brlist)
3455
    result.Raise()
3456
    if not result.data:
3457
      raise errors.OpPrereqError("One or more target bridges %s does not"
3458
                                 " exist on destination node '%s'" %
3459
                                 (brlist, target_node))
3460

    
3461
  def Exec(self, feedback_fn):
3462
    """Failover an instance.
3463

3464
    The failover is done by shutting it down on its present node and
3465
    starting it on the secondary.
3466

3467
    """
3468
    instance = self.instance
3469

    
3470
    source_node = instance.primary_node
3471
    target_node = instance.secondary_nodes[0]
3472

    
3473
    feedback_fn("* checking disk consistency between source and target")
3474
    for dev in instance.disks:
3475
      # for drbd, these are drbd over lvm
3476
      if not _CheckDiskConsistency(self, dev, target_node, False):
3477
        if instance.admin_up and not self.op.ignore_consistency:
3478
          raise errors.OpExecError("Disk %s is degraded on target node,"
3479
                                   " aborting failover." % dev.iv_name)
3480

    
3481
    feedback_fn("* shutting down instance on source node")
3482
    logging.info("Shutting down instance %s on node %s",
3483
                 instance.name, source_node)
3484

    
3485
    result = self.rpc.call_instance_shutdown(source_node, instance)
3486
    msg = result.RemoteFailMsg()
3487
    if msg:
3488
      if self.op.ignore_consistency:
3489
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3490
                             " Proceeding anyway. Please make sure node"
3491
                             " %s is down. Error details: %s",
3492
                             instance.name, source_node, source_node, msg)
3493
      else:
3494
        raise errors.OpExecError("Could not shutdown instance %s on"
3495
                                 " node %s: %s" %
3496
                                 (instance.name, source_node, msg))
3497

    
3498
    feedback_fn("* deactivating the instance's disks on source node")
3499
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3500
      raise errors.OpExecError("Can't shut down the instance's disks.")
3501

    
3502
    instance.primary_node = target_node
3503
    # distribute new instance config to the other nodes
3504
    self.cfg.Update(instance)
3505

    
3506
    # Only start the instance if it's marked as up
3507
    if instance.admin_up:
3508
      feedback_fn("* activating the instance's disks on target node")
3509
      logging.info("Starting instance %s on node %s",
3510
                   instance.name, target_node)
3511

    
3512
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3513
                                               ignore_secondaries=True)
3514
      if not disks_ok:
3515
        _ShutdownInstanceDisks(self, instance)
3516
        raise errors.OpExecError("Can't activate the instance's disks")
3517

    
3518
      feedback_fn("* starting the instance on the target node")
3519
      result = self.rpc.call_instance_start(target_node, instance)
3520
      msg = result.RemoteFailMsg()
3521
      if msg:
3522
        _ShutdownInstanceDisks(self, instance)
3523
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3524
                                 (instance.name, target_node, msg))
3525

    
3526

    
3527
class LUMigrateInstance(LogicalUnit):
3528
  """Migrate an instance.
3529

3530
  This is migration without shutting down, compared to the failover,
3531
  which is done with shutdown.
3532

3533
  """
3534
  HPATH = "instance-migrate"
3535
  HTYPE = constants.HTYPE_INSTANCE
3536
  _OP_REQP = ["instance_name", "live", "cleanup"]
3537

    
3538
  REQ_BGL = False
3539

    
3540
  def ExpandNames(self):
3541
    self._ExpandAndLockInstance()
3542
    self.needed_locks[locking.LEVEL_NODE] = []
3543
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3544

    
3545
  def DeclareLocks(self, level):
3546
    if level == locking.LEVEL_NODE:
3547
      self._LockInstancesNodes()
3548

    
3549
  def BuildHooksEnv(self):
3550
    """Build hooks env.
3551

3552
    This runs on master, primary and secondary nodes of the instance.
3553

3554
    """
3555
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3556
    env["MIGRATE_LIVE"] = self.op.live
3557
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3558
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3559
    return env, nl, nl
3560

    
3561
  def CheckPrereq(self):
3562
    """Check prerequisites.
3563

3564
    This checks that the instance is in the cluster.
3565

3566
    """
3567
    instance = self.cfg.GetInstanceInfo(
3568
      self.cfg.ExpandInstanceName(self.op.instance_name))
3569
    if instance is None:
3570
      raise errors.OpPrereqError("Instance '%s' not known" %
3571
                                 self.op.instance_name)
3572

    
3573
    if instance.disk_template != constants.DT_DRBD8:
3574
      raise errors.OpPrereqError("Instance's disk layout is not"
3575
                                 " drbd8, cannot migrate.")
3576

    
3577
    secondary_nodes = instance.secondary_nodes
3578
    if not secondary_nodes:
3579
      raise errors.ConfigurationError("No secondary node but using"
3580
                                      " drbd8 disk template")
3581

    
3582
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3583

    
3584
    target_node = secondary_nodes[0]
3585
    # check memory requirements on the secondary node
3586
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3587
                         instance.name, i_be[constants.BE_MEMORY],
3588
                         instance.hypervisor)
3589

    
3590
    # check bridge existance
3591
    brlist = [nic.bridge for nic in instance.nics]
3592
    result = self.rpc.call_bridges_exist(target_node, brlist)
3593
    if result.failed or not result.data:
3594
      raise errors.OpPrereqError("One or more target bridges %s does not"
3595
                                 " exist on destination node '%s'" %
3596
                                 (brlist, target_node))
3597

    
3598
    if not self.op.cleanup:
3599
      _CheckNodeNotDrained(self, target_node)
3600
      result = self.rpc.call_instance_migratable(instance.primary_node,
3601
                                                 instance)
3602
      msg = result.RemoteFailMsg()
3603
      if msg:
3604
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3605
                                   msg)
3606

    
3607
    self.instance = instance
3608

    
3609
  def _WaitUntilSync(self):
3610
    """Poll with custom rpc for disk sync.
3611

3612
    This uses our own step-based rpc call.
3613

3614
    """
3615
    self.feedback_fn("* wait until resync is done")
3616
    all_done = False
3617
    while not all_done:
3618
      all_done = True
3619
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3620
                                            self.nodes_ip,
3621
                                            self.instance.disks)
3622
      min_percent = 100
3623
      for node, nres in result.items():
3624
        msg = nres.RemoteFailMsg()
3625
        if msg:
3626
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3627
                                   (node, msg))
3628
        node_done, node_percent = nres.payload
3629
        all_done = all_done and node_done
3630
        if node_percent is not None:
3631
          min_percent = min(min_percent, node_percent)
3632
      if not all_done:
3633
        if min_percent < 100:
3634
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3635
        time.sleep(2)
3636

    
3637
  def _EnsureSecondary(self, node):
3638
    """Demote a node to secondary.
3639

3640
    """
3641
    self.feedback_fn("* switching node %s to secondary mode" % node)
3642

    
3643
    for dev in self.instance.disks:
3644
      self.cfg.SetDiskID(dev, node)
3645

    
3646
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3647
                                          self.instance.disks)
3648
    msg = result.RemoteFailMsg()
3649
    if msg:
3650
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3651
                               " error %s" % (node, msg))
3652

    
3653
  def _GoStandalone(self):
3654
    """Disconnect from the network.
3655

3656
    """
3657
    self.feedback_fn("* changing into standalone mode")
3658
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3659
                                               self.instance.disks)
3660
    for node, nres in result.items():
3661
      msg = nres.RemoteFailMsg()
3662
      if msg:
3663
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3664
                                 " error %s" % (node, msg))
3665

    
3666
  def _GoReconnect(self, multimaster):
3667
    """Reconnect to the network.
3668

3669
    """
3670
    if multimaster:
3671
      msg = "dual-master"
3672
    else:
3673
      msg = "single-master"
3674
    self.feedback_fn("* changing disks into %s mode" % msg)
3675
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3676
                                           self.instance.disks,
3677
                                           self.instance.name, multimaster)
3678
    for node, nres in result.items():
3679
      msg = nres.RemoteFailMsg()
3680
      if msg:
3681
        raise errors.OpExecError("Cannot change disks config on node %s,"
3682
                                 " error: %s" % (node, msg))
3683

    
3684
  def _ExecCleanup(self):
3685
    """Try to cleanup after a failed migration.
3686

3687
    The cleanup is done by:
3688
      - check that the instance is running only on one node
3689
        (and update the config if needed)
3690
      - change disks on its secondary node to secondary
3691
      - wait until disks are fully synchronized
3692
      - disconnect from the network
3693
      - change disks into single-master mode
3694
      - wait again until disks are fully synchronized
3695

3696
    """
3697
    instance = self.instance
3698
    target_node = self.target_node
3699
    source_node = self.source_node
3700

    
3701
    # check running on only one node
3702
    self.feedback_fn("* checking where the instance actually runs"
3703
                     " (if this hangs, the hypervisor might be in"
3704
                     " a bad state)")
3705
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3706
    for node, result in ins_l.items():
3707
      result.Raise()
3708
      if not isinstance(result.data, list):
3709
        raise errors.OpExecError("Can't contact node '%s'" % node)
3710

    
3711
    runningon_source = instance.name in ins_l[source_node].data
3712
    runningon_target = instance.name in ins_l[target_node].data
3713

    
3714
    if runningon_source and runningon_target:
3715
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3716
                               " or the hypervisor is confused. You will have"
3717
                               " to ensure manually that it runs only on one"
3718
                               " and restart this operation.")
3719

    
3720
    if not (runningon_source or runningon_target):
3721
      raise errors.OpExecError("Instance does not seem to be running at all."
3722
                               " In this case, it's safer to repair by"
3723
                               " running 'gnt-instance stop' to ensure disk"
3724
                               " shutdown, and then restarting it.")
3725

    
3726
    if runningon_target:
3727
      # the migration has actually succeeded, we need to update the config
3728
      self.feedback_fn("* instance running on secondary node (%s),"
3729
                       " updating config" % target_node)
3730
      instance.primary_node = target_node
3731
      self.cfg.Update(instance)
3732
      demoted_node = source_node
3733
    else:
3734
      self.feedback_fn("* instance confirmed to be running on its"
3735
                       " primary node (%s)" % source_node)
3736
      demoted_node = target_node
3737

    
3738
    self._EnsureSecondary(demoted_node)
3739
    try:
3740
      self._WaitUntilSync()
3741
    except errors.OpExecError:
3742
      # we ignore here errors, since if the device is standalone, it
3743
      # won't be able to sync
3744
      pass
3745
    self._GoStandalone()
3746
    self._GoReconnect(False)
3747
    self._WaitUntilSync()
3748

    
3749
    self.feedback_fn("* done")
3750

    
3751
  def _RevertDiskStatus(self):
3752
    """Try to revert the disk status after a failed migration.
3753

3754
    """
3755
    target_node = self.target_node
3756
    try:
3757
      self._EnsureSecondary(target_node)
3758
      self._GoStandalone()
3759
      self._GoReconnect(False)
3760
      self._WaitUntilSync()
3761
    except errors.OpExecError, err:
3762
      self.LogWarning("Migration failed and I can't reconnect the"
3763
                      " drives: error '%s'\n"
3764
                      "Please look and recover the instance status" %
3765
                      str(err))
3766

    
3767
  def _AbortMigration(self):
3768
    """Call the hypervisor code to abort a started migration.
3769

3770
    """
3771
    instance = self.instance
3772
    target_node = self.target_node
3773
    migration_info = self.migration_info
3774

    
3775
    abort_result = self.rpc.call_finalize_migration(target_node,
3776
                                                    instance,
3777
                                                    migration_info,
3778
                                                    False)
3779
    abort_msg = abort_result.RemoteFailMsg()
3780
    if abort_msg:
3781
      logging.error("Aborting migration failed on target node %s: %s" %
3782
                    (target_node, abort_msg))
3783
      # Don't raise an exception here, as we stil have to try to revert the
3784
      # disk status, even if this step failed.
3785

    
3786
  def _ExecMigration(self):
3787
    """Migrate an instance.
3788

3789
    The migrate is done by:
3790
      - change the disks into dual-master mode
3791
      - wait until disks are fully synchronized again
3792
      - migrate the instance
3793
      - change disks on the new secondary node (the old primary) to secondary
3794
      - wait until disks are fully synchronized
3795
      - change disks into single-master mode
3796

3797
    """
3798
    instance = self.instance
3799
    target_node = self.target_node
3800
    source_node = self.source_node
3801

    
3802
    self.feedback_fn("* checking disk consistency between source and target")
3803
    for dev in instance.disks:
3804
      if not _CheckDiskConsistency(self, dev, target_node, False):
3805
        raise errors.OpExecError("Disk %s is degraded or not fully"
3806
                                 " synchronized on target node,"
3807
                                 " aborting migrate." % dev.iv_name)
3808

    
3809
    # First get the migration information from the remote node
3810
    result = self.rpc.call_migration_info(source_node, instance)
3811
    msg = result.RemoteFailMsg()
3812
    if msg:
3813
      log_err = ("Failed fetching source migration information from %s: %s" %
3814
                 (source_node, msg))
3815
      logging.error(log_err)
3816
      raise errors.OpExecError(log_err)
3817

    
3818
    self.migration_info = migration_info = result.payload
3819

    
3820
    # Then switch the disks to master/master mode
3821
    self._EnsureSecondary(target_node)
3822
    self._GoStandalone()
3823
    self._GoReconnect(True)
3824
    self._WaitUntilSync()
3825

    
3826
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3827
    result = self.rpc.call_accept_instance(target_node,
3828
                                           instance,
3829
                                           migration_info,
3830
                                           self.nodes_ip[target_node])
3831

    
3832
    msg = result.RemoteFailMsg()
3833
    if msg:
3834
      logging.error("Instance pre-migration failed, trying to revert"
3835
                    " disk status: %s", msg)
3836
      self._AbortMigration()
3837
      self._RevertDiskStatus()
3838
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3839
                               (instance.name, msg))
3840

    
3841
    self.feedback_fn("* migrating instance to %s" % target_node)
3842
    time.sleep(10)
3843
    result = self.rpc.call_instance_migrate(source_node, instance,
3844
                                            self.nodes_ip[target_node],
3845
                                            self.op.live)
3846
    msg = result.RemoteFailMsg()
3847
    if msg:
3848
      logging.error("Instance migration failed, trying to revert"
3849
                    " disk status: %s", msg)
3850
      self._AbortMigration()
3851
      self._RevertDiskStatus()
3852
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3853
                               (instance.name, msg))
3854
    time.sleep(10)
3855

    
3856
    instance.primary_node = target_node
3857
    # distribute new instance config to the other nodes
3858
    self.cfg.Update(instance)
3859

    
3860
    result = self.rpc.call_finalize_migration(target_node,
3861
                                              instance,
3862
                                              migration_info,
3863
                                              True)
3864
    msg = result.RemoteFailMsg()
3865
    if msg:
3866
      logging.error("Instance migration succeeded, but finalization failed:"
3867
                    " %s" % msg)
3868
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3869
                               msg)
3870

    
3871
    self._EnsureSecondary(source_node)
3872
    self._WaitUntilSync()
3873
    self._GoStandalone()
3874
    self._GoReconnect(False)
3875
    self._WaitUntilSync()
3876

    
3877
    self.feedback_fn("* done")
3878

    
3879
  def Exec(self, feedback_fn):
3880
    """Perform the migration.
3881

3882
    """
3883
    self.feedback_fn = feedback_fn
3884

    
3885
    self.source_node = self.instance.primary_node
3886
    self.target_node = self.instance.secondary_nodes[0]
3887
    self.all_nodes = [self.source_node, self.target_node]
3888
    self.nodes_ip = {
3889
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3890
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3891
      }
3892
    if self.op.cleanup:
3893
      return self._ExecCleanup()
3894
    else:
3895
      return self._ExecMigration()
3896

    
3897

    
3898
def _CreateBlockDev(lu, node, instance, device, force_create,
3899
                    info, force_open):
3900
  """Create a tree of block devices on a given node.
3901

3902
  If this device type has to be created on secondaries, create it and
3903
  all its children.
3904

3905
  If not, just recurse to children keeping the same 'force' value.
3906

3907
  @param lu: the lu on whose behalf we execute
3908
  @param node: the node on which to create the device
3909
  @type instance: L{objects.Instance}
3910
  @param instance: the instance which owns the device
3911
  @type device: L{objects.Disk}
3912
  @param device: the device to create
3913
  @type force_create: boolean
3914
  @param force_create: whether to force creation of this device; this
3915
      will be change to True whenever we find a device which has
3916
      CreateOnSecondary() attribute
3917
  @param info: the extra 'metadata' we should attach to the device
3918
      (this will be represented as a LVM tag)
3919
  @type force_open: boolean
3920
  @param force_open: this parameter will be passes to the
3921
      L{backend.BlockdevCreate} function where it specifies
3922
      whether we run on primary or not, and it affects both
3923
      the child assembly and the device own Open() execution
3924

3925
  """
3926
  if device.CreateOnSecondary():
3927
    force_create = True
3928

    
3929
  if device.children:
3930
    for child in device.children:
3931
      _CreateBlockDev(lu, node, instance, child, force_create,
3932
                      info, force_open)
3933

    
3934
  if not force_create:
3935
    return
3936

    
3937
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3938

    
3939

    
3940
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3941
  """Create a single block device on a given node.
3942

3943
  This will not recurse over children of the device, so they must be
3944
  created in advance.
3945

3946
  @param lu: the lu on whose behalf we execute
3947
  @param node: the node on which to create the device
3948
  @type instance: L{objects.Instance}
3949
  @param instance: the instance which owns the device
3950
  @type device: L{objects.Disk}
3951
  @param device: the device to create
3952
  @param info: the extra 'metadata' we should attach to the device
3953
      (this will be represented as a LVM tag)
3954
  @type force_open: boolean
3955
  @param force_open: this parameter will be passes to the
3956
      L{backend.BlockdevCreate} function where it specifies
3957
      whether we run on primary or not, and it affects both
3958
      the child assembly and the device own Open() execution
3959

3960
  """
3961
  lu.cfg.SetDiskID(device, node)
3962
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3963
                                       instance.name, force_open, info)
3964
  msg = result.RemoteFailMsg()
3965
  if msg:
3966
    raise errors.OpExecError("Can't create block device %s on"
3967
                             " node %s for instance %s: %s" %
3968
                             (device, node, instance.name, msg))
3969
  if device.physical_id is None:
3970
    device.physical_id = result.payload
3971

    
3972

    
3973
def _GenerateUniqueNames(lu, exts):
3974
  """Generate a suitable LV name.
3975

3976
  This will generate a logical volume name for the given instance.
3977

3978
  """
3979
  results = []
3980
  for val in exts:
3981
    new_id = lu.cfg.GenerateUniqueID()
3982
    results.append("%s%s" % (new_id, val))
3983
  return results
3984

    
3985

    
3986
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3987
                         p_minor, s_minor):
3988
  """Generate a drbd8 device complete with its children.
3989

3990
  """
3991
  port = lu.cfg.AllocatePort()
3992
  vgname = lu.cfg.GetVGName()
3993
  shared_secret = lu.cfg.GenerateDRBDSecret()
3994
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3995
                          logical_id=(vgname, names[0]))
3996
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3997
                          logical_id=(vgname, names[1]))
3998
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3999
                          logical_id=(primary, secondary, port,
4000
                                      p_minor, s_minor,
4001
                                      shared_secret),
4002
                          children=[dev_data, dev_meta],
4003
                          iv_name=iv_name)
4004
  return drbd_dev
4005

    
4006

    
4007
def _GenerateDiskTemplate(lu, template_name,
4008
                          instance_name, primary_node,
4009
                          secondary_nodes, disk_info,
4010
                          file_storage_dir, file_driver,
4011
                          base_index):
4012
  """Generate the entire disk layout for a given template type.
4013

4014
  """
4015
  #TODO: compute space requirements
4016

    
4017
  vgname = lu.cfg.GetVGName()
4018
  disk_count = len(disk_info)
4019
  disks = []
4020
  if template_name == constants.DT_DISKLESS:
4021
    pass
4022
  elif template_name == constants.DT_PLAIN:
4023
    if len(secondary_nodes) != 0:
4024
      raise errors.ProgrammerError("Wrong template configuration")
4025

    
4026
    names = _GenerateUniqueNames(lu, [".disk%d" % i
4027
                                      for i in range(disk_count)])
4028
    for idx, disk in enumerate(disk_info):
4029
      disk_index = idx + base_index
4030
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4031
                              logical_id=(vgname, names[idx]),
4032
                              iv_name="disk/%d" % disk_index,
4033
                              mode=disk["mode"])
4034
      disks.append(disk_dev)
4035
  elif template_name == constants.DT_DRBD8:
4036
    if len(secondary_nodes) != 1:
4037
      raise errors.ProgrammerError("Wrong template configuration")
4038
    remote_node = secondary_nodes[0]
4039
    minors = lu.cfg.AllocateDRBDMinor(
4040
      [primary_node, remote_node] * len(disk_info), instance_name)
4041

    
4042
    names = []
4043
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4044
                                               for i in range(disk_count)]):
4045
      names.append(lv_prefix + "_data")
4046
      names.append(lv_prefix + "_meta")
4047
    for idx, disk in enumerate(disk_info):
4048
      disk_index = idx + base_index
4049
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4050
                                      disk["size"], names[idx*2:idx*2+2],
4051
                                      "disk/%d" % disk_index,
4052
                                      minors[idx*2], minors[idx*2+1])
4053
      disk_dev.mode = disk["mode"]
4054
      disks.append(disk_dev)
4055
  elif template_name == constants.DT_FILE:
4056
    if len(secondary_nodes) != 0:
4057
      raise errors.ProgrammerError("Wrong template configuration")
4058

    
4059
    for idx, disk in enumerate(disk_info):
4060
      disk_index = idx + base_index
4061
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4062
                              iv_name="disk/%d" % disk_index,
4063
                              logical_id=(file_driver,
4064
                                          "%s/disk%d" % (file_storage_dir,
4065
                                                         disk_index)),
4066
                              mode=disk["mode"])
4067
      disks.append(disk_dev)
4068
  else:
4069
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4070
  return disks
4071

    
4072

    
4073
def _GetInstanceInfoText(instance):
4074
  """Compute that text that should be added to the disk's metadata.
4075

4076
  """
4077
  return "originstname+%s" % instance.name
4078

    
4079

    
4080
def _CreateDisks(lu, instance):
4081
  """Create all disks for an instance.
4082

4083
  This abstracts away some work from AddInstance.
4084

4085
  @type lu: L{LogicalUnit}
4086
  @param lu: the logical unit on whose behalf we execute
4087
  @type instance: L{objects.Instance}
4088
  @param instance: the instance whose disks we should create
4089
  @rtype: boolean
4090
  @return: the success of the creation
4091

4092
  """
4093
  info = _GetInstanceInfoText(instance)
4094
  pnode = instance.primary_node
4095

    
4096
  if instance.disk_template == constants.DT_FILE:
4097
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4098
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4099

    
4100
    if result.failed or not result.data:
4101
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4102

    
4103
    if not result.data[0]:
4104
      raise errors.OpExecError("Failed to create directory '%s'" %
4105
                               file_storage_dir)
4106

    
4107
  # Note: this needs to be kept in sync with adding of disks in
4108
  # LUSetInstanceParams
4109
  for device in instance.disks:
4110
    logging.info("Creating volume %s for instance %s",
4111
                 device.iv_name, instance.name)
4112
    #HARDCODE
4113
    for node in instance.all_nodes:
4114
      f_create = node == pnode
4115
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4116

    
4117

    
4118
def _RemoveDisks(lu, instance):
4119
  """Remove all disks for an instance.
4120

4121
  This abstracts away some work from `AddInstance()` and
4122
  `RemoveInstance()`. Note that in case some of the devices couldn't
4123
  be removed, the removal will continue with the other ones (compare
4124
  with `_CreateDisks()`).
4125

4126
  @type lu: L{LogicalUnit}
4127
  @param lu: the logical unit on whose behalf we execute
4128
  @type instance: L{objects.Instance}
4129
  @param instance: the instance whose disks we should remove
4130
  @rtype: boolean
4131
  @return: the success of the removal
4132

4133
  """
4134
  logging.info("Removing block devices for instance %s", instance.name)
4135

    
4136
  all_result = True
4137
  for device in instance.disks:
4138
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4139
      lu.cfg.SetDiskID(disk, node)
4140
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4141
      if msg:
4142
        lu.LogWarning("Could not remove block device %s on node %s,"
4143
                      " continuing anyway: %s", device.iv_name, node, msg)
4144
        all_result = False
4145

    
4146
  if instance.disk_template == constants.DT_FILE:
4147
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4148
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4149
                                                 file_storage_dir)
4150
    if result.failed or not result.data:
4151
      logging.error("Could not remove directory '%s'", file_storage_dir)
4152
      all_result = False
4153

    
4154
  return all_result
4155

    
4156

    
4157
def _ComputeDiskSize(disk_template, disks):
4158
  """Compute disk size requirements in the volume group
4159

4160
  """
4161
  # Required free disk space as a function of disk and swap space
4162
  req_size_dict = {
4163
    constants.DT_DISKLESS: None,
4164
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4165
    # 128 MB are added for drbd metadata for each disk
4166
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4167
    constants.DT_FILE: None,
4168
  }
4169

    
4170
  if disk_template not in req_size_dict:
4171
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4172
                                 " is unknown" %  disk_template)
4173

    
4174
  return req_size_dict[disk_template]
4175

    
4176

    
4177
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4178
  """Hypervisor parameter validation.
4179

4180
  This function abstract the hypervisor parameter validation to be
4181
  used in both instance create and instance modify.
4182

4183
  @type lu: L{LogicalUnit}
4184
  @param lu: the logical unit for which we check
4185
  @type nodenames: list
4186
  @param nodenames: the list of nodes on which we should check
4187
  @type hvname: string
4188
  @param hvname: the name of the hypervisor we should use
4189
  @type hvparams: dict
4190
  @param hvparams: the parameters which we need to check
4191
  @raise errors.OpPrereqError: if the parameters are not valid
4192

4193
  """
4194
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4195
                                                  hvname,
4196
                                                  hvparams)
4197
  for node in nodenames:
4198
    info = hvinfo[node]
4199
    if info.offline:
4200
      continue
4201
    msg = info.RemoteFailMsg()
4202
    if msg:
4203
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4204
                                 " %s" % msg)
4205

    
4206

    
4207
class LUCreateInstance(LogicalUnit):
4208
  """Create an instance.
4209

4210
  """
4211
  HPATH = "instance-add"
4212
  HTYPE = constants.HTYPE_INSTANCE
4213
  _OP_REQP = ["instance_name", "disks", "disk_template",
4214
              "mode", "start",
4215
              "wait_for_sync", "ip_check", "nics",
4216
              "hvparams", "beparams"]
4217
  REQ_BGL = False
4218

    
4219
  def _ExpandNode(self, node):
4220
    """Expands and checks one node name.
4221

4222
    """
4223
    node_full = self.cfg.ExpandNodeName(node)
4224
    if node_full is None:
4225
      raise errors.OpPrereqError("Unknown node %s" % node)
4226
    return node_full
4227

    
4228
  def ExpandNames(self):
4229
    """ExpandNames for CreateInstance.
4230

4231
    Figure out the right locks for instance creation.
4232

4233
    """
4234
    self.needed_locks = {}
4235

    
4236
    # set optional parameters to none if they don't exist
4237
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4238
      if not hasattr(self.op, attr):
4239
        setattr(self.op, attr, None)
4240

    
4241
    # cheap checks, mostly valid constants given
4242

    
4243
    # verify creation mode
4244
    if self.op.mode not in (constants.INSTANCE_CREATE,
4245
                            constants.INSTANCE_IMPORT):
4246
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4247
                                 self.op.mode)
4248

    
4249
    # disk template and mirror node verification
4250
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4251
      raise errors.OpPrereqError("Invalid disk template name")
4252

    
4253
    if self.op.hypervisor is None:
4254
      self.op.hypervisor = self.cfg.GetHypervisorType()
4255

    
4256
    cluster = self.cfg.GetClusterInfo()
4257
    enabled_hvs = cluster.enabled_hypervisors
4258
    if self.op.hypervisor not in enabled_hvs:
4259
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4260
                                 " cluster (%s)" % (self.op.hypervisor,
4261
                                  ",".join(enabled_hvs)))
4262

    
4263
    # check hypervisor parameter syntax (locally)
4264
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4265
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4266
                                  self.op.hvparams)
4267
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4268
    hv_type.CheckParameterSyntax(filled_hvp)
4269

    
4270
    # fill and remember the beparams dict
4271
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4272
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4273
                                    self.op.beparams)
4274

    
4275
    #### instance parameters check
4276

    
4277
    # instance name verification
4278
    hostname1 = utils.HostInfo(self.op.instance_name)
4279
    self.op.instance_name = instance_name = hostname1.name
4280

    
4281
    # this is just a preventive check, but someone might still add this
4282
    # instance in the meantime, and creation will fail at lock-add time
4283
    if instance_name in self.cfg.GetInstanceList():
4284
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4285
                                 instance_name)
4286

    
4287
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4288

    
4289
    # NIC buildup
4290
    self.nics = []
4291
    for nic in self.op.nics:
4292
      # ip validity checks
4293
      ip = nic.get("ip", None)
4294
      if ip is None or ip.lower() == "none":
4295
        nic_ip = None
4296
      elif ip.lower() == constants.VALUE_AUTO:
4297
        nic_ip = hostname1.ip
4298
      else:
4299
        if not utils.IsValidIP(ip):
4300
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4301
                                     " like a valid IP" % ip)
4302
        nic_ip = ip
4303

    
4304
      # MAC address verification
4305
      mac = nic.get("mac", constants.VALUE_AUTO)
4306
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4307
        if not utils.IsValidMac(mac.lower()):
4308
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4309
                                     mac)
4310
      # bridge verification
4311
      bridge = nic.get("bridge", None)
4312
      if bridge is None:
4313
        bridge = self.cfg.GetDefBridge()
4314
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4315

    
4316
    # disk checks/pre-build
4317
    self.disks = []
4318
    for disk in self.op.disks:
4319
      mode = disk.get("mode", constants.DISK_RDWR)
4320
      if mode not in constants.DISK_ACCESS_SET:
4321
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4322
                                   mode)
4323
      size = disk.get("size", None)
4324
      if size is None:
4325
        raise errors.OpPrereqError("Missing disk size")
4326
      try:
4327
        size = int(size)
4328
      except ValueError:
4329
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4330
      self.disks.append({"size": size, "mode": mode})
4331

    
4332
    # used in CheckPrereq for ip ping check
4333
    self.check_ip = hostname1.ip
4334

    
4335
    # file storage checks
4336
    if (self.op.file_driver and
4337
        not self.op.file_driver in constants.FILE_DRIVER):
4338
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4339
                                 self.op.file_driver)
4340

    
4341
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4342
      raise errors.OpPrereqError("File storage directory path not absolute")
4343

    
4344
    ### Node/iallocator related checks
4345
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4346
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4347
                                 " node must be given")
4348

    
4349
    if self.op.iallocator:
4350
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4351
    else:
4352
      self.op.pnode = self._ExpandNode(self.op.pnode)
4353
      nodelist = [self.op.pnode]
4354
      if self.op.snode is not None:
4355
        self.op.snode = self._ExpandNode(self.op.snode)
4356
        nodelist.append(self.op.snode)
4357
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4358

    
4359
    # in case of import lock the source node too
4360
    if self.op.mode == constants.INSTANCE_IMPORT:
4361
      src_node = getattr(self.op, "src_node", None)
4362
      src_path = getattr(self.op, "src_path", None)
4363

    
4364
      if src_path is None:
4365
        self.op.src_path = src_path = self.op.instance_name
4366

    
4367
      if src_node is None:
4368
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4369
        self.op.src_node = None
4370
        if os.path.isabs(src_path):
4371
          raise errors.OpPrereqError("Importing an instance from an absolute"
4372
                                     " path requires a source node option.")
4373
      else:
4374
        self.op.src_node = src_node = self._ExpandNode(src_node)
4375
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4376
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4377
        if not os.path.isabs(src_path):
4378
          self.op.src_path = src_path = \
4379
            os.path.join(constants.EXPORT_DIR, src_path)
4380

    
4381
    else: # INSTANCE_CREATE
4382
      if getattr(self.op, "os_type", None) is None:
4383
        raise errors.OpPrereqError("No guest OS specified")
4384

    
4385
  def _RunAllocator(self):
4386
    """Run the allocator based on input opcode.
4387

4388
    """
4389
    nics = [n.ToDict() for n in self.nics]
4390
    ial = IAllocator(self,
4391
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4392
                     name=self.op.instance_name,
4393
                     disk_template=self.op.disk_template,
4394
                     tags=[],
4395
                     os=self.op.os_type,
4396
                     vcpus=self.be_full[constants.BE_VCPUS],
4397
                     mem_size=self.be_full[constants.BE_MEMORY],
4398
                     disks=self.disks,
4399
                     nics=nics,
4400
                     hypervisor=self.op.hypervisor,
4401
                     )
4402

    
4403
    ial.Run(self.op.iallocator)
4404

    
4405
    if not ial.success:
4406
      raise errors.OpPrereqError("Can't compute nodes using"
4407
                                 " iallocator '%s': %s" % (self.op.iallocator,
4408
                                                           ial.info))
4409
    if len(ial.nodes) != ial.required_nodes:
4410
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4411
                                 " of nodes (%s), required %s" %
4412
                                 (self.op.iallocator, len(ial.nodes),
4413
                                  ial.required_nodes))
4414
    self.op.pnode = ial.nodes[0]
4415
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4416
                 self.op.instance_name, self.op.iallocator,
4417
                 ", ".join(ial.nodes))
4418
    if ial.required_nodes == 2:
4419
      self.op.snode = ial.nodes[1]
4420

    
4421
  def BuildHooksEnv(self):
4422
    """Build hooks env.
4423

4424
    This runs on master, primary and secondary nodes of the instance.
4425

4426
    """
4427
    env = {
4428
      "ADD_MODE": self.op.mode,
4429
      }
4430
    if self.op.mode == constants.INSTANCE_IMPORT:
4431
      env["SRC_NODE"] = self.op.src_node
4432
      env["SRC_PATH"] = self.op.src_path
4433
      env["SRC_IMAGES"] = self.src_images
4434

    
4435
    env.update(_BuildInstanceHookEnv(
4436
      name=self.op.instance_name,
4437
      primary_node=self.op.pnode,
4438
      secondary_nodes=self.secondaries,
4439
      status=self.op.start,
4440
      os_type=self.op.os_type,
4441
      memory=self.be_full[constants.BE_MEMORY],
4442
      vcpus=self.be_full[constants.BE_VCPUS],
4443
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4444
      disk_template=self.op.disk_template,
4445
      disks=[(d["size"], d["mode"]) for d in self.disks],
4446
    ))
4447

    
4448
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4449
          self.secondaries)
4450
    return env, nl, nl
4451

    
4452

    
4453
  def CheckPrereq(self):
4454
    """Check prerequisites.
4455

4456
    """
4457
    if (not self.cfg.GetVGName() and
4458
        self.op.disk_template not in constants.DTS_NOT_LVM):
4459
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4460
                                 " instances")
4461

    
4462
    if self.op.mode == constants.INSTANCE_IMPORT:
4463
      src_node = self.op.src_node
4464
      src_path = self.op.src_path
4465

    
4466
      if src_node is None:
4467
        exp_list = self.rpc.call_export_list(
4468
          self.acquired_locks[locking.LEVEL_NODE])
4469
        found = False
4470
        for node in exp_list:
4471
          if not exp_list[node].failed and src_path in exp_list[node].data:
4472
            found = True
4473
            self.op.src_node = src_node = node
4474
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4475
                                                       src_path)
4476
            break
4477
        if not found:
4478
          raise errors.OpPrereqError("No export found for relative path %s" %
4479
                                      src_path)
4480

    
4481
      _CheckNodeOnline(self, src_node)
4482
      result = self.rpc.call_export_info(src_node, src_path)
4483
      result.Raise()
4484
      if not result.data:
4485
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4486

    
4487
      export_info = result.data
4488
      if not export_info.has_section(constants.INISECT_EXP):
4489
        raise errors.ProgrammerError("Corrupted export config")
4490

    
4491
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4492
      if (int(ei_version) != constants.EXPORT_VERSION):
4493
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4494
                                   (ei_version, constants.EXPORT_VERSION))
4495

    
4496
      # Check that the new instance doesn't have less disks than the export
4497
      instance_disks = len(self.disks)
4498
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4499
      if instance_disks < export_disks:
4500
        raise errors.OpPrereqError("Not enough disks to import."
4501
                                   " (instance: %d, export: %d)" %
4502
                                   (instance_disks, export_disks))
4503

    
4504
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4505
      disk_images = []
4506
      for idx in range(export_disks):
4507
        option = 'disk%d_dump' % idx
4508
        if export_info.has_option(constants.INISECT_INS, option):
4509
          # FIXME: are the old os-es, disk sizes, etc. useful?
4510
          export_name = export_info.get(constants.INISECT_INS, option)
4511
          image = os.path.join(src_path, export_name)
4512
          disk_images.append(image)
4513
        else:
4514
          disk_images.append(False)
4515

    
4516
      self.src_images = disk_images
4517

    
4518
      old_name = export_info.get(constants.INISECT_INS, 'name')
4519
      # FIXME: int() here could throw a ValueError on broken exports
4520
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4521
      if self.op.instance_name == old_name:
4522
        for idx, nic in enumerate(self.nics):
4523
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4524
            nic_mac_ini = 'nic%d_mac' % idx
4525
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4526

    
4527
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4528
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4529
    if self.op.start and not self.op.ip_check:
4530
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4531
                                 " adding an instance in start mode")
4532

    
4533
    if self.op.ip_check:
4534
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4535
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4536
                                   (self.check_ip, self.op.instance_name))
4537

    
4538
    #### mac address generation
4539
    # By generating here the mac address both the allocator and the hooks get
4540
    # the real final mac address rather than the 'auto' or 'generate' value.
4541
    # There is a race condition between the generation and the instance object
4542
    # creation, which means that we know the mac is valid now, but we're not
4543
    # sure it will be when we actually add the instance. If things go bad
4544
    # adding the instance will abort because of a duplicate mac, and the
4545
    # creation job will fail.
4546
    for nic in self.nics:
4547
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4548
        nic.mac = self.cfg.GenerateMAC()
4549

    
4550
    #### allocator run
4551

    
4552
    if self.op.iallocator is not None:
4553
      self._RunAllocator()
4554

    
4555
    #### node related checks
4556

    
4557
    # check primary node
4558
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4559
    assert self.pnode is not None, \
4560
      "Cannot retrieve locked node %s" % self.op.pnode
4561
    if pnode.offline:
4562
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4563
                                 pnode.name)
4564
    if pnode.drained:
4565
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4566
                                 pnode.name)
4567

    
4568
    self.secondaries = []
4569

    
4570
    # mirror node verification
4571
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4572
      if self.op.snode is None:
4573
        raise errors.OpPrereqError("The networked disk templates need"
4574
                                   " a mirror node")
4575
      if self.op.snode == pnode.name:
4576
        raise errors.OpPrereqError("The secondary node cannot be"
4577
                                   " the primary node.")
4578
      _CheckNodeOnline(self, self.op.snode)
4579
      _CheckNodeNotDrained(self, self.op.snode)
4580
      self.secondaries.append(self.op.snode)
4581

    
4582
    nodenames = [pnode.name] + self.secondaries
4583

    
4584
    req_size = _ComputeDiskSize(self.op.disk_template,
4585
                                self.disks)
4586

    
4587
    # Check lv size requirements
4588
    if req_size is not None:
4589
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4590
                                         self.op.hypervisor)
4591
      for node in nodenames:
4592
        info = nodeinfo[node]
4593
        info.Raise()
4594
        info = info.data
4595
        if not info:
4596
          raise errors.OpPrereqError("Cannot get current information"
4597
                                     " from node '%s'" % node)
4598
        vg_free = info.get('vg_free', None)
4599
        if not isinstance(vg_free, int):
4600
          raise errors.OpPrereqError("Can't compute free disk space on"
4601
                                     " node %s" % node)
4602
        if req_size > info['vg_free']:
4603
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4604
                                     " %d MB available, %d MB required" %
4605
                                     (node, info['vg_free'], req_size))
4606

    
4607
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4608

    
4609
    # os verification
4610
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4611
    result.Raise()
4612
    if not isinstance(result.data, objects.OS):
4613
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4614
                                 " primary node"  % self.op.os_type)
4615

    
4616
    # bridge check on primary node
4617
    bridges = [n.bridge for n in self.nics]
4618
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4619
    result.Raise()
4620
    if not result.data:
4621
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4622
                                 " exist on destination node '%s'" %
4623
                                 (",".join(bridges), pnode.name))
4624

    
4625
    # memory check on primary node
4626
    if self.op.start:
4627
      _CheckNodeFreeMemory(self, self.pnode.name,
4628
                           "creating instance %s" % self.op.instance_name,
4629
                           self.be_full[constants.BE_MEMORY],
4630
                           self.op.hypervisor)
4631

    
4632
  def Exec(self, feedback_fn):
4633
    """Create and add the instance to the cluster.
4634

4635
    """
4636
    instance = self.op.instance_name
4637
    pnode_name = self.pnode.name
4638

    
4639
    ht_kind = self.op.hypervisor
4640
    if ht_kind in constants.HTS_REQ_PORT:
4641
      network_port = self.cfg.AllocatePort()
4642
    else:
4643
      network_port = None
4644

    
4645
    ##if self.op.vnc_bind_address is None:
4646
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4647

    
4648
    # this is needed because os.path.join does not accept None arguments
4649
    if self.op.file_storage_dir is None:
4650
      string_file_storage_dir = ""
4651
    else:
4652
      string_file_storage_dir = self.op.file_storage_dir
4653

    
4654
    # build the full file storage dir path
4655
    file_storage_dir = os.path.normpath(os.path.join(
4656
                                        self.cfg.GetFileStorageDir(),
4657
                                        string_file_storage_dir, instance))
4658

    
4659

    
4660
    disks = _GenerateDiskTemplate(self,
4661
                                  self.op.disk_template,
4662
                                  instance, pnode_name,
4663
                                  self.secondaries,
4664
                                  self.disks,
4665
                                  file_storage_dir,
4666
                                  self.op.file_driver,
4667
                                  0)
4668

    
4669
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4670
                            primary_node=pnode_name,
4671
                            nics=self.nics, disks=disks,
4672
                            disk_template=self.op.disk_template,
4673
                            admin_up=False,
4674
                            network_port=network_port,
4675
                            beparams=self.op.beparams,
4676
                            hvparams=self.op.hvparams,
4677
                            hypervisor=self.op.hypervisor,
4678
                            )
4679

    
4680
    feedback_fn("* creating instance disks...")
4681
    try:
4682
      _CreateDisks(self, iobj)
4683
    except errors.OpExecError:
4684
      self.LogWarning("Device creation failed, reverting...")
4685
      try:
4686
        _RemoveDisks(self, iobj)
4687
      finally:
4688
        self.cfg.ReleaseDRBDMinors(instance)
4689
        raise
4690

    
4691
    feedback_fn("adding instance %s to cluster config" % instance)
4692

    
4693
    self.cfg.AddInstance(iobj)
4694
    # Declare that we don't want to remove the instance lock anymore, as we've
4695
    # added the instance to the config
4696
    del self.remove_locks[locking.LEVEL_INSTANCE]
4697
    # Unlock all the nodes
4698
    if self.op.mode == constants.INSTANCE_IMPORT:
4699
      nodes_keep = [self.op.src_node]
4700
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4701
                       if node != self.op.src_node]
4702
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4703
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4704
    else:
4705
      self.context.glm.release(locking.LEVEL_NODE)
4706
      del self.acquired_locks[locking.LEVEL_NODE]
4707

    
4708
    if self.op.wait_for_sync:
4709
      disk_abort = not _WaitForSync(self, iobj)
4710
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4711
      # make sure the disks are not degraded (still sync-ing is ok)
4712
      time.sleep(15)
4713
      feedback_fn("* checking mirrors status")
4714
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4715
    else:
4716
      disk_abort = False
4717

    
4718
    if disk_abort:
4719
      _RemoveDisks(self, iobj)
4720
      self.cfg.RemoveInstance(iobj.name)
4721
      # Make sure the instance lock gets removed
4722
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4723
      raise errors.OpExecError("There are some degraded disks for"
4724
                               " this instance")
4725

    
4726
    feedback_fn("creating os for instance %s on node %s" %
4727
                (instance, pnode_name))
4728

    
4729
    if iobj.disk_template != constants.DT_DISKLESS:
4730
      if self.op.mode == constants.INSTANCE_CREATE:
4731
        feedback_fn("* running the instance OS create scripts...")
4732
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4733
        msg = result.RemoteFailMsg()
4734
        if msg:
4735
          raise errors.OpExecError("Could not add os for instance %s"
4736
                                   " on node %s: %s" %
4737
                                   (instance, pnode_name, msg))
4738

    
4739
      elif self.op.mode == constants.INSTANCE_IMPORT:
4740
        feedback_fn("* running the instance OS import scripts...")
4741
        src_node = self.op.src_node
4742
        src_images = self.src_images
4743
        cluster_name = self.cfg.GetClusterName()
4744
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4745
                                                         src_node, src_images,
4746
                                                         cluster_name)
4747
        import_result.Raise()
4748
        for idx, result in enumerate(import_result.data):
4749
          if not result:
4750
            self.LogWarning("Could not import the image %s for instance"
4751
                            " %s, disk %d, on node %s" %
4752
                            (src_images[idx], instance, idx, pnode_name))
4753
      else:
4754
        # also checked in the prereq part
4755
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4756
                                     % self.op.mode)
4757

    
4758
    if self.op.start:
4759
      iobj.admin_up = True
4760
      self.cfg.Update(iobj)
4761
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4762
      feedback_fn("* starting instance...")
4763
      result = self.rpc.call_instance_start(pnode_name, iobj)
4764
      msg = result.RemoteFailMsg()
4765
      if msg:
4766
        raise errors.OpExecError("Could not start instance: %s" % msg)
4767

    
4768

    
4769
class LUConnectConsole(NoHooksLU):
4770
  """Connect to an instance's console.
4771

4772
  This is somewhat special in that it returns the command line that
4773
  you need to run on the master node in order to connect to the
4774
  console.
4775

4776
  """
4777
  _OP_REQP = ["instance_name"]
4778
  REQ_BGL = False
4779

    
4780
  def ExpandNames(self):
4781
    self._ExpandAndLockInstance()
4782

    
4783
  def CheckPrereq(self):
4784
    """Check prerequisites.
4785

4786
    This checks that the instance is in the cluster.
4787

4788
    """
4789
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4790
    assert self.instance is not None, \
4791
      "Cannot retrieve locked instance %s" % self.op.instance_name
4792
    _CheckNodeOnline(self, self.instance.primary_node)
4793

    
4794
  def Exec(self, feedback_fn):
4795
    """Connect to the console of an instance
4796

4797
    """
4798
    instance = self.instance
4799
    node = instance.primary_node
4800

    
4801
    node_insts = self.rpc.call_instance_list([node],
4802
                                             [instance.hypervisor])[node]
4803
    node_insts.Raise()
4804

    
4805
    if instance.name not in node_insts.data:
4806
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4807

    
4808
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4809

    
4810
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4811
    cluster = self.cfg.GetClusterInfo()
4812
    # beparams and hvparams are passed separately, to avoid editing the
4813
    # instance and then saving the defaults in the instance itself.
4814
    hvparams = cluster.FillHV(instance)
4815
    beparams = cluster.FillBE(instance)
4816
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4817

    
4818
    # build ssh cmdline
4819
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4820

    
4821

    
4822
class LUReplaceDisks(LogicalUnit):
4823
  """Replace the disks of an instance.
4824

4825
  """
4826
  HPATH = "mirrors-replace"
4827
  HTYPE = constants.HTYPE_INSTANCE
4828
  _OP_REQP = ["instance_name", "mode", "disks"]
4829
  REQ_BGL = False
4830

    
4831
  def CheckArguments(self):
4832
    if not hasattr(self.op, "remote_node"):
4833
      self.op.remote_node = None
4834
    if not hasattr(self.op, "iallocator"):
4835
      self.op.iallocator = None
4836

    
4837
    # check for valid parameter combination
4838
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4839
    if self.op.mode == constants.REPLACE_DISK_CHG:
4840
      if cnt == 2:
4841
        raise errors.OpPrereqError("When changing the secondary either an"
4842
                                   " iallocator script must be used or the"
4843
                                   " new node given")
4844
      elif cnt == 0:
4845
        raise errors.OpPrereqError("Give either the iallocator or the new"
4846
                                   " secondary, not both")
4847
    else: # not replacing the secondary
4848
      if cnt != 2:
4849
        raise errors.OpPrereqError("The iallocator and new node options can"
4850
                                   " be used only when changing the"
4851
                                   " secondary node")
4852

    
4853
  def ExpandNames(self):
4854
    self._ExpandAndLockInstance()
4855

    
4856
    if self.op.iallocator is not None:
4857
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4858
    elif self.op.remote_node is not None:
4859
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4860
      if remote_node is None:
4861
        raise errors.OpPrereqError("Node '%s' not known" %
4862
                                   self.op.remote_node)
4863
      self.op.remote_node = remote_node
4864
      # Warning: do not remove the locking of the new secondary here
4865
      # unless DRBD8.AddChildren is changed to work in parallel;
4866
      # currently it doesn't since parallel invocations of
4867
      # FindUnusedMinor will conflict
4868
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4869
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4870
    else:
4871
      self.needed_locks[locking.LEVEL_NODE] = []
4872
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4873

    
4874
  def DeclareLocks(self, level):
4875
    # If we're not already locking all nodes in the set we have to declare the
4876
    # instance's primary/secondary nodes.
4877
    if (level == locking.LEVEL_NODE and
4878
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4879
      self._LockInstancesNodes()
4880

    
4881
  def _RunAllocator(self):
4882
    """Compute a new secondary node using an IAllocator.
4883

4884
    """
4885
    ial = IAllocator(self,
4886
                     mode=constants.IALLOCATOR_MODE_RELOC,
4887
                     name=self.op.instance_name,
4888
                     relocate_from=[self.sec_node])
4889

    
4890
    ial.Run(self.op.iallocator)
4891

    
4892
    if not ial.success:
4893
      raise errors.OpPrereqError("Can't compute nodes using"
4894
                                 " iallocator '%s': %s" % (self.op.iallocator,
4895
                                                           ial.info))
4896
    if len(ial.nodes) != ial.required_nodes:
4897
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4898
                                 " of nodes (%s), required %s" %
4899
                                 (len(ial.nodes), ial.required_nodes))
4900
    self.op.remote_node = ial.nodes[0]
4901
    self.LogInfo("Selected new secondary for the instance: %s",
4902
                 self.op.remote_node)
4903

    
4904
  def BuildHooksEnv(self):
4905
    """Build hooks env.
4906

4907
    This runs on the master, the primary and all the secondaries.
4908

4909
    """
4910
    env = {
4911
      "MODE": self.op.mode,
4912
      "NEW_SECONDARY": self.op.remote_node,
4913
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4914
      }
4915
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4916
    nl = [
4917
      self.cfg.GetMasterNode(),
4918
      self.instance.primary_node,
4919
      ]
4920
    if self.op.remote_node is not None:
4921
      nl.append(self.op.remote_node)
4922
    return env, nl, nl
4923

    
4924
  def CheckPrereq(self):
4925
    """Check prerequisites.
4926

4927
    This checks that the instance is in the cluster.
4928

4929
    """
4930
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4931
    assert instance is not None, \
4932
      "Cannot retrieve locked instance %s" % self.op.instance_name
4933
    self.instance = instance
4934

    
4935
    if instance.disk_template != constants.DT_DRBD8:
4936
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4937
                                 " instances")
4938

    
4939
    if len(instance.secondary_nodes) != 1:
4940
      raise errors.OpPrereqError("The instance has a strange layout,"
4941
                                 " expected one secondary but found %d" %
4942
                                 len(instance.secondary_nodes))
4943

    
4944
    self.sec_node = instance.secondary_nodes[0]
4945

    
4946
    if self.op.iallocator is not None:
4947
      self._RunAllocator()
4948

    
4949
    remote_node = self.op.remote_node
4950
    if remote_node is not None:
4951
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4952
      assert self.remote_node_info is not None, \
4953
        "Cannot retrieve locked node %s" % remote_node
4954
    else:
4955
      self.remote_node_info = None
4956
    if remote_node == instance.primary_node:
4957
      raise errors.OpPrereqError("The specified node is the primary node of"
4958
                                 " the instance.")
4959
    elif remote_node == self.sec_node:
4960
      raise errors.OpPrereqError("The specified node is already the"
4961
                                 " secondary node of the instance.")
4962

    
4963
    if self.op.mode == constants.REPLACE_DISK_PRI:
4964
      n1 = self.tgt_node = instance.primary_node
4965
      n2 = self.oth_node = self.sec_node
4966
    elif self.op.mode == constants.REPLACE_DISK_SEC:
4967
      n1 = self.tgt_node = self.sec_node
4968
      n2 = self.oth_node = instance.primary_node
4969
    elif self.op.mode == constants.REPLACE_DISK_CHG:
4970
      n1 = self.new_node = remote_node
4971
      n2 = self.oth_node = instance.primary_node
4972
      self.tgt_node = self.sec_node
4973
      _CheckNodeNotDrained(self, remote_node)
4974
    else:
4975
      raise errors.ProgrammerError("Unhandled disk replace mode")
4976

    
4977
    _CheckNodeOnline(self, n1)
4978
    _CheckNodeOnline(self, n2)
4979

    
4980
    if not self.op.disks:
4981
      self.op.disks = range(len(instance.disks))
4982

    
4983
    for disk_idx in self.op.disks:
4984
      instance.FindDisk(disk_idx)
4985

    
4986
  def _ExecD8DiskOnly(self, feedback_fn):
4987
    """Replace a disk on the primary or secondary for dbrd8.
4988

4989
    The algorithm for replace is quite complicated:
4990

4991
      1. for each disk to be replaced:
4992

4993
        1. create new LVs on the target node with unique names
4994
        1. detach old LVs from the drbd device
4995
        1. rename old LVs to name_replaced.<time_t>
4996
        1. rename new LVs to old LVs
4997
        1. attach the new LVs (with the old names now) to the drbd device
4998

4999
      1. wait for sync across all devices
5000

5001
      1. for each modified disk:
5002

5003
        1. remove old LVs (which have the name name_replaces.<time_t>)
5004

5005
    Failures are not very well handled.
5006

5007
    """
5008
    steps_total = 6
5009
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5010
    instance = self.instance
5011
    iv_names = {}
5012
    vgname = self.cfg.GetVGName()
5013
    # start of work
5014
    cfg = self.cfg
5015
    tgt_node = self.tgt_node
5016
    oth_node = self.oth_node
5017

    
5018
    # Step: check device activation
5019
    self.proc.LogStep(1, steps_total, "check device existence")
5020
    info("checking volume groups")
5021
    my_vg = cfg.GetVGName()
5022
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5023
    if not results:
5024
      raise errors.OpExecError("Can't list volume groups on the nodes")
5025
    for node in oth_node, tgt_node:
5026
      res = results[node]
5027
      if res.failed or not res.data or my_vg not in res.data:
5028
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5029
                                 (my_vg, node))
5030
    for idx, dev in enumerate(instance.disks):
5031
      if idx not in self.op.disks:
5032
        continue
5033
      for node in tgt_node, oth_node:
5034
        info("checking disk/%d on %s" % (idx, node))
5035
        cfg.SetDiskID(dev, node)
5036
        result = self.rpc.call_blockdev_find(node, dev)
5037
        msg = result.RemoteFailMsg()
5038
        if not msg and not result.payload:
5039
          msg = "disk not found"
5040
        if msg:
5041
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5042
                                   (idx, node, msg))
5043

    
5044
    # Step: check other node consistency
5045
    self.proc.LogStep(2, steps_total, "check peer consistency")
5046
    for idx, dev in enumerate(instance.disks):
5047
      if idx not in self.op.disks:
5048
        continue
5049
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5050
      if not _CheckDiskConsistency(self, dev, oth_node,
5051
                                   oth_node==instance.primary_node):
5052
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5053
                                 " to replace disks on this node (%s)" %
5054
                                 (oth_node, tgt_node))
5055

    
5056
    # Step: create new storage
5057
    self.proc.LogStep(3, steps_total, "allocate new storage")
5058
    for idx, dev in enumerate(instance.disks):
5059
      if idx not in self.op.disks:
5060
        continue
5061
      size = dev.size
5062
      cfg.SetDiskID(dev, tgt_node)
5063
      lv_names = [".disk%d_%s" % (idx, suf)
5064
                  for suf in ["data", "meta"]]
5065
      names = _GenerateUniqueNames(self, lv_names)
5066
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5067
                             logical_id=(vgname, names[0]))
5068
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5069
                             logical_id=(vgname, names[1]))
5070
      new_lvs = [lv_data, lv_meta]
5071
      old_lvs = dev.children
5072
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5073
      info("creating new local storage on %s for %s" %
5074
           (tgt_node, dev.iv_name))
5075
      # we pass force_create=True to force the LVM creation
5076
      for new_lv in new_lvs:
5077
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5078
                        _GetInstanceInfoText(instance), False)
5079

    
5080
    # Step: for each lv, detach+rename*2+attach
5081
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5082
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5083
      info("detaching %s drbd from local storage" % dev.iv_name)
5084
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5085
      result.Raise()
5086
      if not result.data:
5087
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5088
                                 " %s for device %s" % (tgt_node, dev.iv_name))
5089
      #dev.children = []
5090
      #cfg.Update(instance)
5091

    
5092
      # ok, we created the new LVs, so now we know we have the needed
5093
      # storage; as such, we proceed on the target node to rename
5094
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5095
      # using the assumption that logical_id == physical_id (which in
5096
      # turn is the unique_id on that node)
5097

    
5098
      # FIXME(iustin): use a better name for the replaced LVs
5099
      temp_suffix = int(time.time())
5100
      ren_fn = lambda d, suff: (d.physical_id[0],
5101
                                d.physical_id[1] + "_replaced-%s" % suff)
5102
      # build the rename list based on what LVs exist on the node
5103
      rlist = []
5104
      for to_ren in old_lvs:
5105
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5106
        if not result.RemoteFailMsg() and result.payload:
5107
          # device exists
5108
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5109

    
5110
      info("renaming the old LVs on the target node")
5111
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5112
      result.Raise()
5113
      if not result.data:
5114
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5115
      # now we rename the new LVs to the old LVs
5116
      info("renaming the new LVs on the target node")
5117
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5118
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5119
      result.Raise()
5120
      if not result.data:
5121
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5122

    
5123
      for old, new in zip(old_lvs, new_lvs):
5124
        new.logical_id = old.logical_id
5125
        cfg.SetDiskID(new, tgt_node)
5126

    
5127
      for disk in old_lvs:
5128
        disk.logical_id = ren_fn(disk, temp_suffix)
5129
        cfg.SetDiskID(disk, tgt_node)
5130

    
5131
      # now that the new lvs have the old name, we can add them to the device
5132
      info("adding new mirror component on %s" % tgt_node)
5133
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5134
      if result.failed or not result.data:
5135
        for new_lv in new_lvs:
5136
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5137
          if msg:
5138
            warning("Can't rollback device %s: %s", dev, msg,
5139
                    hint="cleanup manually the unused logical volumes")
5140
        raise errors.OpExecError("Can't add local storage to drbd")
5141

    
5142
      dev.children = new_lvs
5143
      cfg.Update(instance)
5144

    
5145
    # Step: wait for sync
5146

    
5147
    # this can fail as the old devices are degraded and _WaitForSync
5148
    # does a combined result over all disks, so we don't check its
5149
    # return value
5150
    self.proc.LogStep(5, steps_total, "sync devices")
5151
    _WaitForSync(self, instance, unlock=True)
5152

    
5153
    # so check manually all the devices
5154
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5155
      cfg.SetDiskID(dev, instance.primary_node)
5156
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5157
      msg = result.RemoteFailMsg()
5158
      if not msg and not result.payload:
5159
        msg = "disk not found"
5160
      if msg:
5161
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5162
                                 (name, msg))
5163
      if result.payload[5]:
5164
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5165

    
5166
    # Step: remove old storage
5167
    self.proc.LogStep(6, steps_total, "removing old storage")
5168
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5169
      info("remove logical volumes for %s" % name)
5170
      for lv in old_lvs:
5171
        cfg.SetDiskID(lv, tgt_node)
5172
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5173
        if msg:
5174
          warning("Can't remove old LV: %s" % msg,
5175
                  hint="manually remove unused LVs")
5176
          continue
5177

    
5178
  def _ExecD8Secondary(self, feedback_fn):
5179
    """Replace the secondary node for drbd8.
5180

5181
    The algorithm for replace is quite complicated:
5182
      - for all disks of the instance:
5183
        - create new LVs on the new node with same names
5184
        - shutdown the drbd device on the old secondary
5185
        - disconnect the drbd network on the primary
5186
        - create the drbd device on the new secondary
5187
        - network attach the drbd on the primary, using an artifice:
5188
          the drbd code for Attach() will connect to the network if it
5189
          finds a device which is connected to the good local disks but
5190
          not network enabled
5191
      - wait for sync across all devices
5192
      - remove all disks from the old secondary
5193

5194
    Failures are not very well handled.
5195

5196
    """
5197
    steps_total = 6
5198
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5199
    instance = self.instance
5200
    iv_names = {}
5201
    # start of work
5202
    cfg = self.cfg
5203
    old_node = self.tgt_node
5204
    new_node = self.new_node
5205
    pri_node = instance.primary_node
5206
    nodes_ip = {
5207
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5208
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5209
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5210
      }
5211

    
5212
    # Step: check device activation
5213
    self.proc.LogStep(1, steps_total, "check device existence")
5214
    info("checking volume groups")
5215
    my_vg = cfg.GetVGName()
5216
    results = self.rpc.call_vg_list([pri_node, new_node])
5217
    for node in pri_node, new_node:
5218
      res = results[node]
5219
      if res.failed or not res.data or my_vg not in res.data:
5220
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5221
                                 (my_vg, node))
5222
    for idx, dev in enumerate(instance.disks):
5223
      if idx not in self.op.disks:
5224
        continue
5225
      info("checking disk/%d on %s" % (idx, pri_node))
5226
      cfg.SetDiskID(dev, pri_node)
5227
      result = self.rpc.call_blockdev_find(pri_node, dev)
5228
      msg = result.RemoteFailMsg()
5229
      if not msg and not result.payload:
5230
        msg = "disk not found"
5231
      if msg:
5232
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5233
                                 (idx, pri_node, msg))
5234

    
5235
    # Step: check other node consistency
5236
    self.proc.LogStep(2, steps_total, "check peer consistency")
5237
    for idx, dev in enumerate(instance.disks):
5238
      if idx not in self.op.disks:
5239
        continue
5240
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5241
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5242
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5243
                                 " unsafe to replace the secondary" %
5244
                                 pri_node)
5245

    
5246
    # Step: create new storage
5247
    self.proc.LogStep(3, steps_total, "allocate new storage")
5248
    for idx, dev in enumerate(instance.disks):
5249
      info("adding new local storage on %s for disk/%d" %
5250
           (new_node, idx))
5251
      # we pass force_create=True to force LVM creation
5252
      for new_lv in dev.children:
5253
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5254
                        _GetInstanceInfoText(instance), False)
5255

    
5256
    # Step 4: dbrd minors and drbd setups changes
5257
    # after this, we must manually remove the drbd minors on both the
5258
    # error and the success paths
5259
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5260
                                   instance.name)
5261
    logging.debug("Allocated minors %s" % (minors,))
5262
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5263
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5264
      size = dev.size
5265
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5266
      # create new devices on new_node; note that we create two IDs:
5267
      # one without port, so the drbd will be activated without
5268
      # networking information on the new node at this stage, and one
5269
      # with network, for the latter activation in step 4
5270
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5271
      if pri_node == o_node1:
5272
        p_minor = o_minor1
5273
      else:
5274
        p_minor = o_minor2
5275

    
5276
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5277
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5278

    
5279
      iv_names[idx] = (dev, dev.children, new_net_id)
5280
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5281
                    new_net_id)
5282
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5283
                              logical_id=new_alone_id,
5284
                              children=dev.children)
5285
      try:
5286
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5287
                              _GetInstanceInfoText(instance), False)
5288
      except errors.BlockDeviceError:
5289
        self.cfg.ReleaseDRBDMinors(instance.name)
5290
        raise
5291

    
5292
    for idx, dev in enumerate(instance.disks):
5293
      # we have new devices, shutdown the drbd on the old secondary
5294
      info("shutting down drbd for disk/%d on old node" % idx)
5295
      cfg.SetDiskID(dev, old_node)
5296
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5297
      if msg:
5298
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5299
                (idx, msg),
5300
                hint="Please cleanup this device manually as soon as possible")
5301

    
5302
    info("detaching primary drbds from the network (=> standalone)")
5303
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5304
                                               instance.disks)[pri_node]
5305

    
5306
    msg = result.RemoteFailMsg()
5307
    if msg:
5308
      # detaches didn't succeed (unlikely)
5309
      self.cfg.ReleaseDRBDMinors(instance.name)
5310
      raise errors.OpExecError("Can't detach the disks from the network on"
5311
                               " old node: %s" % (msg,))
5312

    
5313
    # if we managed to detach at least one, we update all the disks of
5314
    # the instance to point to the new secondary
5315
    info("updating instance configuration")
5316
    for dev, _, new_logical_id in iv_names.itervalues():
5317
      dev.logical_id = new_logical_id
5318
      cfg.SetDiskID(dev, pri_node)
5319
    cfg.Update(instance)
5320

    
5321
    # and now perform the drbd attach
5322
    info("attaching primary drbds to new secondary (standalone => connected)")
5323
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5324
                                           instance.disks, instance.name,
5325
                                           False)
5326
    for to_node, to_result in result.items():
5327
      msg = to_result.RemoteFailMsg()
5328
      if msg:
5329
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5330
                hint="please do a gnt-instance info to see the"
5331
                " status of disks")
5332

    
5333
    # this can fail as the old devices are degraded and _WaitForSync
5334
    # does a combined result over all disks, so we don't check its
5335
    # return value
5336
    self.proc.LogStep(5, steps_total, "sync devices")
5337
    _WaitForSync(self, instance, unlock=True)
5338

    
5339
    # so check manually all the devices
5340
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5341
      cfg.SetDiskID(dev, pri_node)
5342
      result = self.rpc.call_blockdev_find(pri_node, dev)
5343
      msg = result.RemoteFailMsg()
5344
      if not msg and not result.payload:
5345
        msg = "disk not found"
5346
      if msg:
5347
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5348
                                 (idx, msg))
5349
      if result.payload[5]:
5350
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5351

    
5352
    self.proc.LogStep(6, steps_total, "removing old storage")
5353
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5354
      info("remove logical volumes for disk/%d" % idx)
5355
      for lv in old_lvs:
5356
        cfg.SetDiskID(lv, old_node)
5357
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5358
        if msg:
5359
          warning("Can't remove LV on old secondary: %s", msg,
5360
                  hint="Cleanup stale volumes by hand")
5361

    
5362
  def Exec(self, feedback_fn):
5363
    """Execute disk replacement.
5364

5365
    This dispatches the disk replacement to the appropriate handler.
5366

5367
    """
5368
    instance = self.instance
5369

    
5370
    # Activate the instance disks if we're replacing them on a down instance
5371
    if not instance.admin_up:
5372
      _StartInstanceDisks(self, instance, True)
5373

    
5374
    if self.op.mode == constants.REPLACE_DISK_CHG:
5375
      fn = self._ExecD8Secondary
5376
    else:
5377
      fn = self._ExecD8DiskOnly
5378

    
5379
    ret = fn(feedback_fn)
5380

    
5381
    # Deactivate the instance disks if we're replacing them on a down instance
5382
    if not instance.admin_up:
5383
      _SafeShutdownInstanceDisks(self, instance)
5384

    
5385
    return ret
5386

    
5387

    
5388
class LUGrowDisk(LogicalUnit):
5389
  """Grow a disk of an instance.
5390

5391
  """
5392
  HPATH = "disk-grow"
5393
  HTYPE = constants.HTYPE_INSTANCE
5394
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5395
  REQ_BGL = False
5396

    
5397
  def ExpandNames(self):
5398
    self._ExpandAndLockInstance()
5399
    self.needed_locks[locking.LEVEL_NODE] = []
5400
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5401

    
5402
  def DeclareLocks(self, level):
5403
    if level == locking.LEVEL_NODE:
5404
      self._LockInstancesNodes()
5405

    
5406
  def BuildHooksEnv(self):
5407
    """Build hooks env.
5408

5409
    This runs on the master, the primary and all the secondaries.
5410

5411
    """
5412
    env = {
5413
      "DISK": self.op.disk,
5414
      "AMOUNT": self.op.amount,
5415
      }
5416
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5417
    nl = [
5418
      self.cfg.GetMasterNode(),
5419
      self.instance.primary_node,
5420
      ]
5421
    return env, nl, nl
5422

    
5423
  def CheckPrereq(self):
5424
    """Check prerequisites.
5425

5426
    This checks that the instance is in the cluster.
5427

5428
    """
5429
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5430
    assert instance is not None, \
5431
      "Cannot retrieve locked instance %s" % self.op.instance_name
5432
    nodenames = list(instance.all_nodes)
5433
    for node in nodenames:
5434
      _CheckNodeOnline(self, node)
5435

    
5436

    
5437
    self.instance = instance
5438

    
5439
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5440
      raise errors.OpPrereqError("Instance's disk layout does not support"
5441
                                 " growing.")
5442

    
5443
    self.disk = instance.FindDisk(self.op.disk)
5444

    
5445
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5446
                                       instance.hypervisor)
5447
    for node in nodenames:
5448
      info = nodeinfo[node]
5449
      if info.failed or not info.data:
5450
        raise errors.OpPrereqError("Cannot get current information"
5451
                                   " from node '%s'" % node)
5452
      vg_free = info.data.get('vg_free', None)
5453
      if not isinstance(vg_free, int):
5454
        raise errors.OpPrereqError("Can't compute free disk space on"
5455
                                   " node %s" % node)
5456
      if self.op.amount > vg_free:
5457
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5458
                                   " %d MiB available, %d MiB required" %
5459
                                   (node, vg_free, self.op.amount))
5460

    
5461
  def Exec(self, feedback_fn):
5462
    """Execute disk grow.
5463

5464
    """
5465
    instance = self.instance
5466
    disk = self.disk
5467
    for node in instance.all_nodes:
5468
      self.cfg.SetDiskID(disk, node)
5469
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5470
      msg = result.RemoteFailMsg()
5471
      if msg:
5472
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5473
                                 (node, msg))
5474
    disk.RecordGrow(self.op.amount)
5475
    self.cfg.Update(instance)
5476
    if self.op.wait_for_sync:
5477
      disk_abort = not _WaitForSync(self, instance)
5478
      if disk_abort:
5479
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5480
                             " status.\nPlease check the instance.")
5481

    
5482

    
5483
class LUQueryInstanceData(NoHooksLU):
5484
  """Query runtime instance data.
5485

5486
  """
5487
  _OP_REQP = ["instances", "static"]
5488
  REQ_BGL = False
5489

    
5490
  def ExpandNames(self):
5491
    self.needed_locks = {}
5492
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5493

    
5494
    if not isinstance(self.op.instances, list):
5495
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5496

    
5497
    if self.op.instances:
5498
      self.wanted_names = []
5499
      for name in self.op.instances:
5500
        full_name = self.cfg.ExpandInstanceName(name)
5501
        if full_name is None:
5502
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5503
        self.wanted_names.append(full_name)
5504
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5505
    else:
5506
      self.wanted_names = None
5507
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5508

    
5509
    self.needed_locks[locking.LEVEL_NODE] = []
5510
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5511

    
5512
  def DeclareLocks(self, level):
5513
    if level == locking.LEVEL_NODE:
5514
      self._LockInstancesNodes()
5515

    
5516
  def CheckPrereq(self):
5517
    """Check prerequisites.
5518

5519
    This only checks the optional instance list against the existing names.
5520

5521
    """
5522
    if self.wanted_names is None:
5523
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5524

    
5525
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5526
                             in self.wanted_names]
5527
    return
5528

    
5529
  def _ComputeDiskStatus(self, instance, snode, dev):
5530
    """Compute block device status.
5531

5532
    """
5533
    static = self.op.static
5534
    if not static:
5535
      self.cfg.SetDiskID(dev, instance.primary_node)
5536
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5537
      if dev_pstatus.offline:
5538
        dev_pstatus = None
5539
      else:
5540
        msg = dev_pstatus.RemoteFailMsg()
5541
        if msg:
5542
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5543
                                   (instance.name, msg))
5544
        dev_pstatus = dev_pstatus.payload
5545
    else:
5546
      dev_pstatus = None
5547

    
5548
    if dev.dev_type in constants.LDS_DRBD:
5549
      # we change the snode then (otherwise we use the one passed in)
5550
      if dev.logical_id[0] == instance.primary_node:
5551
        snode = dev.logical_id[1]
5552
      else:
5553
        snode = dev.logical_id[0]
5554

    
5555
    if snode and not static:
5556
      self.cfg.SetDiskID(dev, snode)
5557
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5558
      if dev_sstatus.offline:
5559
        dev_sstatus = None
5560
      else:
5561
        msg = dev_sstatus.RemoteFailMsg()
5562
        if msg:
5563
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5564
                                   (instance.name, msg))
5565
        dev_sstatus = dev_sstatus.payload
5566
    else:
5567
      dev_sstatus = None
5568

    
5569
    if dev.children:
5570
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5571
                      for child in dev.children]
5572
    else:
5573
      dev_children = []
5574

    
5575
    data = {
5576
      "iv_name": dev.iv_name,
5577
      "dev_type": dev.dev_type,
5578
      "logical_id": dev.logical_id,
5579
      "physical_id": dev.physical_id,
5580
      "pstatus": dev_pstatus,
5581
      "sstatus": dev_sstatus,
5582
      "children": dev_children,
5583
      "mode": dev.mode,
5584
      }
5585

    
5586
    return data
5587

    
5588
  def Exec(self, feedback_fn):
5589
    """Gather and return data"""
5590
    result = {}
5591

    
5592
    cluster = self.cfg.GetClusterInfo()
5593

    
5594
    for instance in self.wanted_instances:
5595
      if not self.op.static:
5596
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5597
                                                  instance.name,
5598
                                                  instance.hypervisor)
5599
        remote_info.Raise()
5600
        remote_info = remote_info.data
5601
        if remote_info and "state" in remote_info:
5602
          remote_state = "up"
5603
        else:
5604
          remote_state = "down"
5605
      else:
5606
        remote_state = None
5607
      if instance.admin_up:
5608
        config_state = "up"
5609
      else:
5610
        config_state = "down"
5611

    
5612
      disks = [self._ComputeDiskStatus(instance, None, device)
5613
               for device in instance.disks]
5614

    
5615
      idict = {
5616
        "name": instance.name,
5617
        "config_state": config_state,
5618
        "run_state": remote_state,
5619
        "pnode": instance.primary_node,
5620
        "snodes": instance.secondary_nodes,
5621
        "os": instance.os,
5622
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5623
        "disks": disks,
5624
        "hypervisor": instance.hypervisor,
5625
        "network_port": instance.network_port,
5626
        "hv_instance": instance.hvparams,
5627
        "hv_actual": cluster.FillHV(instance),
5628
        "be_instance": instance.beparams,
5629
        "be_actual": cluster.FillBE(instance),
5630
        }
5631

    
5632
      result[instance.name] = idict
5633

    
5634
    return result
5635

    
5636

    
5637
class LUSetInstanceParams(LogicalUnit):
5638
  """Modifies an instances's parameters.
5639

5640
  """
5641
  HPATH = "instance-modify"
5642
  HTYPE = constants.HTYPE_INSTANCE
5643
  _OP_REQP = ["instance_name"]
5644
  REQ_BGL = False
5645

    
5646
  def CheckArguments(self):
5647
    if not hasattr(self.op, 'nics'):
5648
      self.op.nics = []
5649
    if not hasattr(self.op, 'disks'):
5650
      self.op.disks = []
5651
    if not hasattr(self.op, 'beparams'):
5652
      self.op.beparams = {}
5653
    if not hasattr(self.op, 'hvparams'):
5654
      self.op.hvparams = {}
5655
    self.op.force = getattr(self.op, "force", False)
5656
    if not (self.op.nics or self.op.disks or
5657
            self.op.hvparams or self.op.beparams):
5658
      raise errors.OpPrereqError("No changes submitted")
5659

    
5660
    # Disk validation
5661
    disk_addremove = 0
5662
    for disk_op, disk_dict in self.op.disks:
5663
      if disk_op == constants.DDM_REMOVE:
5664
        disk_addremove += 1
5665
        continue
5666
      elif disk_op == constants.DDM_ADD:
5667
        disk_addremove += 1
5668
      else:
5669
        if not isinstance(disk_op, int):
5670
          raise errors.OpPrereqError("Invalid disk index")
5671
      if disk_op == constants.DDM_ADD:
5672
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5673
        if mode not in constants.DISK_ACCESS_SET:
5674
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5675
        size = disk_dict.get('size', None)
5676
        if size is None:
5677
          raise errors.OpPrereqError("Required disk parameter size missing")
5678
        try:
5679
          size = int(size)
5680
        except ValueError, err:
5681
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5682
                                     str(err))
5683
        disk_dict['size'] = size
5684
      else:
5685
        # modification of disk
5686
        if 'size' in disk_dict:
5687
          raise errors.OpPrereqError("Disk size change not possible, use"
5688
                                     " grow-disk")
5689

    
5690
    if disk_addremove > 1:
5691
      raise errors.OpPrereqError("Only one disk add or remove operation"
5692
                                 " supported at a time")
5693

    
5694
    # NIC validation
5695
    nic_addremove = 0
5696
    for nic_op, nic_dict in self.op.nics:
5697
      if nic_op == constants.DDM_REMOVE:
5698
        nic_addremove += 1
5699
        continue
5700
      elif nic_op == constants.DDM_ADD:
5701
        nic_addremove += 1
5702
      else:
5703
        if not isinstance(nic_op, int):
5704
          raise errors.OpPrereqError("Invalid nic index")
5705

    
5706
      # nic_dict should be a dict
5707
      nic_ip = nic_dict.get('ip', None)
5708
      if nic_ip is not None:
5709
        if nic_ip.lower() == constants.VALUE_NONE:
5710
          nic_dict['ip'] = None
5711
        else:
5712
          if not utils.IsValidIP(nic_ip):
5713
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5714

    
5715
      if nic_op == constants.DDM_ADD:
5716
        nic_bridge = nic_dict.get('bridge', None)
5717
        if nic_bridge is None:
5718
          nic_dict['bridge'] = self.cfg.GetDefBridge()
5719
        nic_mac = nic_dict.get('mac', None)
5720
        if nic_mac is None:
5721
          nic_dict['mac'] = constants.VALUE_AUTO
5722

    
5723
      if 'mac' in nic_dict:
5724
        nic_mac = nic_dict['mac']
5725
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5726
          if not utils.IsValidMac(nic_mac):
5727
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5728
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5729
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5730
                                     " modifying an existing nic")
5731

    
5732
    if nic_addremove > 1:
5733
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5734
                                 " supported at a time")
5735

    
5736
  def ExpandNames(self):
5737
    self._ExpandAndLockInstance()
5738
    self.needed_locks[locking.LEVEL_NODE] = []
5739
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5740

    
5741
  def DeclareLocks(self, level):
5742
    if level == locking.LEVEL_NODE:
5743
      self._LockInstancesNodes()
5744

    
5745
  def BuildHooksEnv(self):
5746
    """Build hooks env.
5747

5748
    This runs on the master, primary and secondaries.
5749

5750
    """
5751
    args = dict()
5752
    if constants.BE_MEMORY in self.be_new:
5753
      args['memory'] = self.be_new[constants.BE_MEMORY]
5754
    if constants.BE_VCPUS in self.be_new:
5755
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5756
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5757
    # information at all.
5758
    if self.op.nics:
5759
      args['nics'] = []
5760
      nic_override = dict(self.op.nics)
5761
      for idx, nic in enumerate(self.instance.nics):
5762
        if idx in nic_override:
5763
          this_nic_override = nic_override[idx]
5764
        else:
5765
          this_nic_override = {}
5766
        if 'ip' in this_nic_override:
5767
          ip = this_nic_override['ip']
5768
        else:
5769
          ip = nic.ip
5770
        if 'bridge' in this_nic_override:
5771
          bridge = this_nic_override['bridge']
5772
        else:
5773
          bridge = nic.bridge
5774
        if 'mac' in this_nic_override:
5775
          mac = this_nic_override['mac']
5776
        else:
5777
          mac = nic.mac
5778
        args['nics'].append((ip, bridge, mac))
5779
      if constants.DDM_ADD in nic_override:
5780
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5781
        bridge = nic_override[constants.DDM_ADD]['bridge']
5782
        mac = nic_override[constants.DDM_ADD]['mac']
5783
        args['nics'].append((ip, bridge, mac))
5784
      elif constants.DDM_REMOVE in nic_override:
5785
        del args['nics'][-1]
5786

    
5787
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5788
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5789
    return env, nl, nl
5790

    
5791
  def CheckPrereq(self):
5792
    """Check prerequisites.
5793

5794
    This only checks the instance list against the existing names.
5795

5796
    """
5797
    force = self.force = self.op.force
5798

    
5799
    # checking the new params on the primary/secondary nodes
5800

    
5801
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5802
    assert self.instance is not None, \
5803
      "Cannot retrieve locked instance %s" % self.op.instance_name
5804
    pnode = instance.primary_node
5805
    nodelist = list(instance.all_nodes)
5806

    
5807
    # hvparams processing
5808
    if self.op.hvparams:
5809
      i_hvdict = copy.deepcopy(instance.hvparams)
5810
      for key, val in self.op.hvparams.iteritems():
5811
        if val == constants.VALUE_DEFAULT:
5812
          try:
5813
            del i_hvdict[key]
5814
          except KeyError:
5815
            pass
5816
        else:
5817
          i_hvdict[key] = val
5818
      cluster = self.cfg.GetClusterInfo()
5819
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5820
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5821
                                i_hvdict)
5822
      # local check
5823
      hypervisor.GetHypervisor(
5824
        instance.hypervisor).CheckParameterSyntax(hv_new)
5825
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5826
      self.hv_new = hv_new # the new actual values
5827
      self.hv_inst = i_hvdict # the new dict (without defaults)
5828
    else:
5829
      self.hv_new = self.hv_inst = {}
5830

    
5831
    # beparams processing
5832
    if self.op.beparams:
5833
      i_bedict = copy.deepcopy(instance.beparams)
5834
      for key, val in self.op.beparams.iteritems():
5835
        if val == constants.VALUE_DEFAULT:
5836
          try:
5837
            del i_bedict[key]
5838
          except KeyError:
5839
            pass
5840
        else:
5841
          i_bedict[key] = val
5842
      cluster = self.cfg.GetClusterInfo()
5843
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5844
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5845
                                i_bedict)
5846
      self.be_new = be_new # the new actual values
5847
      self.be_inst = i_bedict # the new dict (without defaults)
5848
    else:
5849
      self.be_new = self.be_inst = {}
5850

    
5851
    self.warn = []
5852

    
5853
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5854
      mem_check_list = [pnode]
5855
      if be_new[constants.BE_AUTO_BALANCE]:
5856
        # either we changed auto_balance to yes or it was from before
5857
        mem_check_list.extend(instance.secondary_nodes)
5858
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5859
                                                  instance.hypervisor)
5860
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5861
                                         instance.hypervisor)
5862
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5863
        # Assume the primary node is unreachable and go ahead
5864
        self.warn.append("Can't get info from primary node %s" % pnode)
5865
      else:
5866
        if not instance_info.failed and instance_info.data:
5867
          current_mem = instance_info.data['memory']
5868
        else:
5869
          # Assume instance not running
5870
          # (there is a slight race condition here, but it's not very probable,
5871
          # and we have no other way to check)
5872
          current_mem = 0
5873
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5874
                    nodeinfo[pnode].data['memory_free'])
5875
        if miss_mem > 0:
5876
          raise errors.OpPrereqError("This change will prevent the instance"
5877
                                     " from starting, due to %d MB of memory"
5878
                                     " missing on its primary node" % miss_mem)
5879

    
5880
      if be_new[constants.BE_AUTO_BALANCE]:
5881
        for node, nres in nodeinfo.iteritems():
5882
          if node not in instance.secondary_nodes:
5883
            continue
5884
          if nres.failed or not isinstance(nres.data, dict):
5885
            self.warn.append("Can't get info from secondary node %s" % node)
5886
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5887
            self.warn.append("Not enough memory to failover instance to"
5888
                             " secondary node %s" % node)
5889

    
5890
    # NIC processing
5891
    for nic_op, nic_dict in self.op.nics:
5892
      if nic_op == constants.DDM_REMOVE:
5893
        if not instance.nics:
5894
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5895
        continue
5896
      if nic_op != constants.DDM_ADD:
5897
        # an existing nic
5898
        if nic_op < 0 or nic_op >= len(instance.nics):
5899
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5900
                                     " are 0 to %d" %
5901
                                     (nic_op, len(instance.nics)))
5902
      if 'bridge' in nic_dict:
5903
        nic_bridge = nic_dict['bridge']
5904
        if nic_bridge is None:
5905
          raise errors.OpPrereqError('Cannot set the nic bridge to None')
5906
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5907
          msg = ("Bridge '%s' doesn't exist on one of"
5908
                 " the instance nodes" % nic_bridge)
5909
          if self.force:
5910
            self.warn.append(msg)
5911
          else:
5912
            raise errors.OpPrereqError(msg)
5913
      if 'mac' in nic_dict:
5914
        nic_mac = nic_dict['mac']
5915
        if nic_mac is None:
5916
          raise errors.OpPrereqError('Cannot set the nic mac to None')
5917
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5918
          # otherwise generate the mac
5919
          nic_dict['mac'] = self.cfg.GenerateMAC()
5920
        else:
5921
          # or validate/reserve the current one
5922
          if self.cfg.IsMacInUse(nic_mac):
5923
            raise errors.OpPrereqError("MAC address %s already in use"
5924
                                       " in cluster" % nic_mac)
5925

    
5926
    # DISK processing
5927
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5928
      raise errors.OpPrereqError("Disk operations not supported for"
5929
                                 " diskless instances")
5930
    for disk_op, disk_dict in self.op.disks:
5931
      if disk_op == constants.DDM_REMOVE:
5932
        if len(instance.disks) == 1:
5933
          raise errors.OpPrereqError("Cannot remove the last disk of"
5934
                                     " an instance")
5935
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5936
        ins_l = ins_l[pnode]
5937
        if ins_l.failed or not isinstance(ins_l.data, list):
5938
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5939
        if instance.name in ins_l.data:
5940
          raise errors.OpPrereqError("Instance is running, can't remove"
5941
                                     " disks.")
5942

    
5943
      if (disk_op == constants.DDM_ADD and
5944
          len(instance.nics) >= constants.MAX_DISKS):
5945
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5946
                                   " add more" % constants.MAX_DISKS)
5947
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5948
        # an existing disk
5949
        if disk_op < 0 or disk_op >= len(instance.disks):
5950
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5951
                                     " are 0 to %d" %
5952
                                     (disk_op, len(instance.disks)))
5953

    
5954
    return
5955

    
5956
  def Exec(self, feedback_fn):
5957
    """Modifies an instance.
5958

5959
    All parameters take effect only at the next restart of the instance.
5960

5961
    """
5962
    # Process here the warnings from CheckPrereq, as we don't have a
5963
    # feedback_fn there.
5964
    for warn in self.warn:
5965
      feedback_fn("WARNING: %s" % warn)
5966

    
5967
    result = []
5968
    instance = self.instance
5969
    # disk changes
5970
    for disk_op, disk_dict in self.op.disks:
5971
      if disk_op == constants.DDM_REMOVE:
5972
        # remove the last disk
5973
        device = instance.disks.pop()
5974
        device_idx = len(instance.disks)
5975
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5976
          self.cfg.SetDiskID(disk, node)
5977
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5978
          if msg:
5979
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
5980
                            " continuing anyway", device_idx, node, msg)
5981
        result.append(("disk/%d" % device_idx, "remove"))
5982
      elif disk_op == constants.DDM_ADD:
5983
        # add a new disk
5984
        if instance.disk_template == constants.DT_FILE:
5985
          file_driver, file_path = instance.disks[0].logical_id
5986
          file_path = os.path.dirname(file_path)
5987
        else:
5988
          file_driver = file_path = None
5989
        disk_idx_base = len(instance.disks)
5990
        new_disk = _GenerateDiskTemplate(self,
5991
                                         instance.disk_template,
5992
                                         instance.name, instance.primary_node,
5993
                                         instance.secondary_nodes,
5994
                                         [disk_dict],
5995
                                         file_path,
5996
                                         file_driver,
5997
                                         disk_idx_base)[0]
5998
        instance.disks.append(new_disk)
5999
        info = _GetInstanceInfoText(instance)
6000

    
6001
        logging.info("Creating volume %s for instance %s",
6002
                     new_disk.iv_name, instance.name)
6003
        # Note: this needs to be kept in sync with _CreateDisks
6004
        #HARDCODE
6005
        for node in instance.all_nodes:
6006
          f_create = node == instance.primary_node
6007
          try:
6008
            _CreateBlockDev(self, node, instance, new_disk,
6009
                            f_create, info, f_create)
6010
          except errors.OpExecError, err:
6011
            self.LogWarning("Failed to create volume %s (%s) on"
6012
                            " node %s: %s",
6013
                            new_disk.iv_name, new_disk, node, err)
6014
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6015
                       (new_disk.size, new_disk.mode)))
6016
      else:
6017
        # change a given disk
6018
        instance.disks[disk_op].mode = disk_dict['mode']
6019
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6020
    # NIC changes
6021
    for nic_op, nic_dict in self.op.nics:
6022
      if nic_op == constants.DDM_REMOVE:
6023
        # remove the last nic
6024
        del instance.nics[-1]
6025
        result.append(("nic.%d" % len(instance.nics), "remove"))
6026
      elif nic_op == constants.DDM_ADD:
6027
        # mac and bridge should be set, by now
6028
        mac = nic_dict['mac']
6029
        bridge = nic_dict['bridge']
6030
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6031
                              bridge=bridge)
6032
        instance.nics.append(new_nic)
6033
        result.append(("nic.%d" % (len(instance.nics) - 1),
6034
                       "add:mac=%s,ip=%s,bridge=%s" %
6035
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
6036
      else:
6037
        # change a given nic
6038
        for key in 'mac', 'ip', 'bridge':
6039
          if key in nic_dict:
6040
            setattr(instance.nics[nic_op], key, nic_dict[key])
6041
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6042

    
6043
    # hvparams changes
6044
    if self.op.hvparams:
6045
      instance.hvparams = self.hv_inst
6046
      for key, val in self.op.hvparams.iteritems():
6047
        result.append(("hv/%s" % key, val))
6048

    
6049
    # beparams changes
6050
    if self.op.beparams:
6051
      instance.beparams = self.be_inst
6052
      for key, val in self.op.beparams.iteritems():
6053
        result.append(("be/%s" % key, val))
6054

    
6055
    self.cfg.Update(instance)
6056

    
6057
    return result
6058

    
6059

    
6060
class LUQueryExports(NoHooksLU):
6061
  """Query the exports list
6062

6063
  """
6064
  _OP_REQP = ['nodes']
6065
  REQ_BGL = False
6066

    
6067
  def ExpandNames(self):
6068
    self.needed_locks = {}
6069
    self.share_locks[locking.LEVEL_NODE] = 1
6070
    if not self.op.nodes:
6071
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6072
    else:
6073
      self.needed_locks[locking.LEVEL_NODE] = \
6074
        _GetWantedNodes(self, self.op.nodes)
6075

    
6076
  def CheckPrereq(self):
6077
    """Check prerequisites.
6078

6079
    """
6080
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6081

    
6082
  def Exec(self, feedback_fn):
6083
    """Compute the list of all the exported system images.
6084

6085
    @rtype: dict
6086
    @return: a dictionary with the structure node->(export-list)
6087
        where export-list is a list of the instances exported on
6088
        that node.
6089

6090
    """
6091
    rpcresult = self.rpc.call_export_list(self.nodes)
6092
    result = {}
6093
    for node in rpcresult:
6094
      if rpcresult[node].failed:
6095
        result[node] = False
6096
      else:
6097
        result[node] = rpcresult[node].data
6098

    
6099
    return result
6100

    
6101

    
6102
class LUExportInstance(LogicalUnit):
6103
  """Export an instance to an image in the cluster.
6104

6105
  """
6106
  HPATH = "instance-export"
6107
  HTYPE = constants.HTYPE_INSTANCE
6108
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6109
  REQ_BGL = False
6110

    
6111
  def ExpandNames(self):
6112
    self._ExpandAndLockInstance()
6113
    # FIXME: lock only instance primary and destination node
6114
    #
6115
    # Sad but true, for now we have do lock all nodes, as we don't know where
6116
    # the previous export might be, and and in this LU we search for it and
6117
    # remove it from its current node. In the future we could fix this by:
6118
    #  - making a tasklet to search (share-lock all), then create the new one,
6119
    #    then one to remove, after
6120
    #  - removing the removal operation altoghether
6121
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6122

    
6123
  def DeclareLocks(self, level):
6124
    """Last minute lock declaration."""
6125
    # All nodes are locked anyway, so nothing to do here.
6126

    
6127
  def BuildHooksEnv(self):
6128
    """Build hooks env.
6129

6130
    This will run on the master, primary node and target node.
6131

6132
    """
6133
    env = {
6134
      "EXPORT_NODE": self.op.target_node,
6135
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6136
      }
6137
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6138
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6139
          self.op.target_node]
6140
    return env, nl, nl
6141

    
6142
  def CheckPrereq(self):
6143
    """Check prerequisites.
6144

6145
    This checks that the instance and node names are valid.
6146

6147
    """
6148
    instance_name = self.op.instance_name
6149
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6150
    assert self.instance is not None, \
6151
          "Cannot retrieve locked instance %s" % self.op.instance_name
6152
    _CheckNodeOnline(self, self.instance.primary_node)
6153

    
6154
    self.dst_node = self.cfg.GetNodeInfo(
6155
      self.cfg.ExpandNodeName(self.op.target_node))
6156

    
6157
    if self.dst_node is None:
6158
      # This is wrong node name, not a non-locked node
6159
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6160
    _CheckNodeOnline(self, self.dst_node.name)
6161
    _CheckNodeNotDrained(self, self.dst_node.name)
6162

    
6163
    # instance disk type verification
6164
    for disk in self.instance.disks:
6165
      if disk.dev_type == constants.LD_FILE:
6166
        raise errors.OpPrereqError("Export not supported for instances with"
6167
                                   " file-based disks")
6168

    
6169
  def Exec(self, feedback_fn):
6170
    """Export an instance to an image in the cluster.
6171

6172
    """
6173
    instance = self.instance
6174
    dst_node = self.dst_node
6175
    src_node = instance.primary_node
6176
    if self.op.shutdown:
6177
      # shutdown the instance, but not the disks
6178
      result = self.rpc.call_instance_shutdown(src_node, instance)
6179
      msg = result.RemoteFailMsg()
6180
      if msg:
6181
        raise errors.OpExecError("Could not shutdown instance %s on"
6182
                                 " node %s: %s" %
6183
                                 (instance.name, src_node, msg))
6184

    
6185
    vgname = self.cfg.GetVGName()
6186

    
6187
    snap_disks = []
6188

    
6189
    # set the disks ID correctly since call_instance_start needs the
6190
    # correct drbd minor to create the symlinks
6191
    for disk in instance.disks:
6192
      self.cfg.SetDiskID(disk, src_node)
6193

    
6194
    try:
6195
      for disk in instance.disks:
6196
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6197
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6198
        if new_dev_name.failed or not new_dev_name.data:
6199
          self.LogWarning("Could not snapshot block device %s on node %s",
6200
                          disk.logical_id[1], src_node)
6201
          snap_disks.append(False)
6202
        else:
6203
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6204
                                 logical_id=(vgname, new_dev_name.data),
6205
                                 physical_id=(vgname, new_dev_name.data),
6206
                                 iv_name=disk.iv_name)
6207
          snap_disks.append(new_dev)
6208

    
6209
    finally:
6210
      if self.op.shutdown and instance.admin_up:
6211
        result = self.rpc.call_instance_start(src_node, instance)
6212
        msg = result.RemoteFailMsg()
6213
        if msg:
6214
          _ShutdownInstanceDisks(self, instance)
6215
          raise errors.OpExecError("Could not start instance: %s" % msg)
6216

    
6217
    # TODO: check for size
6218

    
6219
    cluster_name = self.cfg.GetClusterName()
6220
    for idx, dev in enumerate(snap_disks):
6221
      if dev:
6222
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6223
                                               instance, cluster_name, idx)
6224
        if result.failed or not result.data:
6225
          self.LogWarning("Could not export block device %s from node %s to"
6226
                          " node %s", dev.logical_id[1], src_node,
6227
                          dst_node.name)
6228
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6229
        if msg:
6230
          self.LogWarning("Could not remove snapshot block device %s from node"
6231
                          " %s: %s", dev.logical_id[1], src_node, msg)
6232

    
6233
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6234
    if result.failed or not result.data:
6235
      self.LogWarning("Could not finalize export for instance %s on node %s",
6236
                      instance.name, dst_node.name)
6237

    
6238
    nodelist = self.cfg.GetNodeList()
6239
    nodelist.remove(dst_node.name)
6240

    
6241
    # on one-node clusters nodelist will be empty after the removal
6242
    # if we proceed the backup would be removed because OpQueryExports
6243
    # substitutes an empty list with the full cluster node list.
6244
    if nodelist:
6245
      exportlist = self.rpc.call_export_list(nodelist)
6246
      for node in exportlist:
6247
        if exportlist[node].failed:
6248
          continue
6249
        if instance.name in exportlist[node].data:
6250
          if not self.rpc.call_export_remove(node, instance.name):
6251
            self.LogWarning("Could not remove older export for instance %s"
6252
                            " on node %s", instance.name, node)
6253

    
6254

    
6255
class LURemoveExport(NoHooksLU):
6256
  """Remove exports related to the named instance.
6257

6258
  """
6259
  _OP_REQP = ["instance_name"]
6260
  REQ_BGL = False
6261

    
6262
  def ExpandNames(self):
6263
    self.needed_locks = {}
6264
    # We need all nodes to be locked in order for RemoveExport to work, but we
6265
    # don't need to lock the instance itself, as nothing will happen to it (and
6266
    # we can remove exports also for a removed instance)
6267
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6268

    
6269
  def CheckPrereq(self):
6270
    """Check prerequisites.
6271
    """
6272
    pass
6273

    
6274
  def Exec(self, feedback_fn):
6275
    """Remove any export.
6276

6277
    """
6278
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6279
    # If the instance was not found we'll try with the name that was passed in.
6280
    # This will only work if it was an FQDN, though.
6281
    fqdn_warn = False
6282
    if not instance_name:
6283
      fqdn_warn = True
6284
      instance_name = self.op.instance_name
6285

    
6286
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6287
      locking.LEVEL_NODE])
6288
    found = False
6289
    for node in exportlist:
6290
      if exportlist[node].failed:
6291
        self.LogWarning("Failed to query node %s, continuing" % node)
6292
        continue
6293
      if instance_name in exportlist[node].data:
6294
        found = True
6295
        result = self.rpc.call_export_remove(node, instance_name)
6296
        if result.failed or not result.data:
6297
          logging.error("Could not remove export for instance %s"
6298
                        " on node %s", instance_name, node)
6299

    
6300
    if fqdn_warn and not found:
6301
      feedback_fn("Export not found. If trying to remove an export belonging"
6302
                  " to a deleted instance please use its Fully Qualified"
6303
                  " Domain Name.")
6304

    
6305

    
6306
class TagsLU(NoHooksLU):
6307
  """Generic tags LU.
6308

6309
  This is an abstract class which is the parent of all the other tags LUs.
6310

6311
  """
6312

    
6313
  def ExpandNames(self):
6314
    self.needed_locks = {}
6315
    if self.op.kind == constants.TAG_NODE:
6316
      name = self.cfg.ExpandNodeName(self.op.name)
6317
      if name is None:
6318
        raise errors.OpPrereqError("Invalid node name (%s)" %
6319
                                   (self.op.name,))
6320
      self.op.name = name
6321
      self.needed_locks[locking.LEVEL_NODE] = name
6322
    elif self.op.kind == constants.TAG_INSTANCE:
6323
      name = self.cfg.ExpandInstanceName(self.op.name)
6324
      if name is None:
6325
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6326
                                   (self.op.name,))
6327
      self.op.name = name
6328
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6329

    
6330
  def CheckPrereq(self):
6331
    """Check prerequisites.
6332

6333
    """
6334
    if self.op.kind == constants.TAG_CLUSTER:
6335
      self.target = self.cfg.GetClusterInfo()
6336
    elif self.op.kind == constants.TAG_NODE:
6337
      self.target = self.cfg.GetNodeInfo(self.op.name)
6338
    elif self.op.kind == constants.TAG_INSTANCE:
6339
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6340
    else:
6341
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6342
                                 str(self.op.kind))
6343

    
6344

    
6345
class LUGetTags(TagsLU):
6346
  """Returns the tags of a given object.
6347

6348
  """
6349
  _OP_REQP = ["kind", "name"]
6350
  REQ_BGL = False
6351

    
6352
  def Exec(self, feedback_fn):
6353
    """Returns the tag list.
6354

6355
    """
6356
    return list(self.target.GetTags())
6357

    
6358

    
6359
class LUSearchTags(NoHooksLU):
6360
  """Searches the tags for a given pattern.
6361

6362
  """
6363
  _OP_REQP = ["pattern"]
6364
  REQ_BGL = False
6365

    
6366
  def ExpandNames(self):
6367
    self.needed_locks = {}
6368

    
6369
  def CheckPrereq(self):
6370
    """Check prerequisites.
6371

6372
    This checks the pattern passed for validity by compiling it.
6373

6374
    """
6375
    try:
6376
      self.re = re.compile(self.op.pattern)
6377
    except re.error, err:
6378
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6379
                                 (self.op.pattern, err))
6380

    
6381
  def Exec(self, feedback_fn):
6382
    """Returns the tag list.
6383

6384
    """
6385
    cfg = self.cfg
6386
    tgts = [("/cluster", cfg.GetClusterInfo())]
6387
    ilist = cfg.GetAllInstancesInfo().values()
6388
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6389
    nlist = cfg.GetAllNodesInfo().values()
6390
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6391
    results = []
6392
    for path, target in tgts:
6393
      for tag in target.GetTags():
6394
        if self.re.search(tag):
6395
          results.append((path, tag))
6396
    return results
6397

    
6398

    
6399
class LUAddTags(TagsLU):
6400
  """Sets a tag on a given object.
6401

6402
  """
6403
  _OP_REQP = ["kind", "name", "tags"]
6404
  REQ_BGL = False
6405

    
6406
  def CheckPrereq(self):
6407
    """Check prerequisites.
6408

6409
    This checks the type and length of the tag name and value.
6410

6411
    """
6412
    TagsLU.CheckPrereq(self)
6413
    for tag in self.op.tags:
6414
      objects.TaggableObject.ValidateTag(tag)
6415

    
6416
  def Exec(self, feedback_fn):
6417
    """Sets the tag.
6418

6419
    """
6420
    try:
6421
      for tag in self.op.tags:
6422
        self.target.AddTag(tag)
6423
    except errors.TagError, err:
6424
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6425
    try:
6426
      self.cfg.Update(self.target)
6427
    except errors.ConfigurationError:
6428
      raise errors.OpRetryError("There has been a modification to the"
6429
                                " config file and the operation has been"
6430
                                " aborted. Please retry.")
6431

    
6432

    
6433
class LUDelTags(TagsLU):
6434
  """Delete a list of tags from a given object.
6435

6436
  """
6437
  _OP_REQP = ["kind", "name", "tags"]
6438
  REQ_BGL = False
6439

    
6440
  def CheckPrereq(self):
6441
    """Check prerequisites.
6442

6443
    This checks that we have the given tag.
6444

6445
    """
6446
    TagsLU.CheckPrereq(self)
6447
    for tag in self.op.tags:
6448
      objects.TaggableObject.ValidateTag(tag)
6449
    del_tags = frozenset(self.op.tags)
6450
    cur_tags = self.target.GetTags()
6451
    if not del_tags <= cur_tags:
6452
      diff_tags = del_tags - cur_tags
6453
      diff_names = ["'%s'" % tag for tag in diff_tags]
6454
      diff_names.sort()
6455
      raise errors.OpPrereqError("Tag(s) %s not found" %
6456
                                 (",".join(diff_names)))
6457

    
6458
  def Exec(self, feedback_fn):
6459
    """Remove the tag from the object.
6460

6461
    """
6462
    for tag in self.op.tags:
6463
      self.target.RemoveTag(tag)
6464
    try:
6465
      self.cfg.Update(self.target)
6466
    except errors.ConfigurationError:
6467
      raise errors.OpRetryError("There has been a modification to the"
6468
                                " config file and the operation has been"
6469
                                " aborted. Please retry.")
6470

    
6471

    
6472
class LUTestDelay(NoHooksLU):
6473
  """Sleep for a specified amount of time.
6474

6475
  This LU sleeps on the master and/or nodes for a specified amount of
6476
  time.
6477

6478
  """
6479
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6480
  REQ_BGL = False
6481

    
6482
  def ExpandNames(self):
6483
    """Expand names and set required locks.
6484

6485
    This expands the node list, if any.
6486

6487
    """
6488
    self.needed_locks = {}
6489
    if self.op.on_nodes:
6490
      # _GetWantedNodes can be used here, but is not always appropriate to use
6491
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6492
      # more information.
6493
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6494
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6495

    
6496
  def CheckPrereq(self):
6497
    """Check prerequisites.
6498

6499
    """
6500

    
6501
  def Exec(self, feedback_fn):
6502
    """Do the actual sleep.
6503

6504
    """
6505
    if self.op.on_master:
6506
      if not utils.TestDelay(self.op.duration):
6507
        raise errors.OpExecError("Error during master delay test")
6508
    if self.op.on_nodes:
6509
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6510
      if not result:
6511
        raise errors.OpExecError("Complete failure from rpc call")
6512
      for node, node_result in result.items():
6513
        node_result.Raise()
6514
        if not node_result.data:
6515
          raise errors.OpExecError("Failure during rpc call to node %s,"
6516
                                   " result: %s" % (node, node_result.data))
6517

    
6518

    
6519
class IAllocator(object):
6520
  """IAllocator framework.
6521

6522
  An IAllocator instance has three sets of attributes:
6523
    - cfg that is needed to query the cluster
6524
    - input data (all members of the _KEYS class attribute are required)
6525
    - four buffer attributes (in|out_data|text), that represent the
6526
      input (to the external script) in text and data structure format,
6527
      and the output from it, again in two formats
6528
    - the result variables from the script (success, info, nodes) for
6529
      easy usage
6530

6531
  """
6532
  _ALLO_KEYS = [
6533
    "mem_size", "disks", "disk_template",
6534
    "os", "tags", "nics", "vcpus", "hypervisor",
6535
    ]
6536
  _RELO_KEYS = [
6537
    "relocate_from",
6538
    ]
6539

    
6540
  def __init__(self, lu, mode, name, **kwargs):
6541
    self.lu = lu
6542
    # init buffer variables
6543
    self.in_text = self.out_text = self.in_data = self.out_data = None
6544
    # init all input fields so that pylint is happy
6545
    self.mode = mode
6546
    self.name = name
6547
    self.mem_size = self.disks = self.disk_template = None
6548
    self.os = self.tags = self.nics = self.vcpus = None
6549
    self.hypervisor = None
6550
    self.relocate_from = None
6551
    # computed fields
6552
    self.required_nodes = None
6553
    # init result fields
6554
    self.success = self.info = self.nodes = None
6555
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6556
      keyset = self._ALLO_KEYS
6557
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6558
      keyset = self._RELO_KEYS
6559
    else:
6560
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6561
                                   " IAllocator" % self.mode)
6562
    for key in kwargs:
6563
      if key not in keyset:
6564
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6565
                                     " IAllocator" % key)
6566
      setattr(self, key, kwargs[key])
6567
    for key in keyset:
6568
      if key not in kwargs:
6569
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6570
                                     " IAllocator" % key)
6571
    self._BuildInputData()
6572

    
6573
  def _ComputeClusterData(self):
6574
    """Compute the generic allocator input data.
6575

6576
    This is the data that is independent of the actual operation.
6577

6578
    """
6579
    cfg = self.lu.cfg
6580
    cluster_info = cfg.GetClusterInfo()
6581
    # cluster data
6582
    data = {
6583
      "version": 1,
6584
      "cluster_name": cfg.GetClusterName(),
6585
      "cluster_tags": list(cluster_info.GetTags()),
6586
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6587
      # we don't have job IDs
6588
      }
6589
    iinfo = cfg.GetAllInstancesInfo().values()
6590
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6591

    
6592
    # node data
6593
    node_results = {}
6594
    node_list = cfg.GetNodeList()
6595

    
6596
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6597
      hypervisor_name = self.hypervisor
6598
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6599
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6600

    
6601
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6602
                                           hypervisor_name)
6603
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6604
                       cluster_info.enabled_hypervisors)
6605
    for nname, nresult in node_data.items():
6606
      # first fill in static (config-based) values
6607
      ninfo = cfg.GetNodeInfo(nname)
6608
      pnr = {
6609
        "tags": list(ninfo.GetTags()),
6610
        "primary_ip": ninfo.primary_ip,
6611
        "secondary_ip": ninfo.secondary_ip,
6612
        "offline": ninfo.offline,
6613
        "drained": ninfo.drained,
6614
        "master_candidate": ninfo.master_candidate,
6615
        }
6616

    
6617
      if not ninfo.offline:
6618
        nresult.Raise()
6619
        if not isinstance(nresult.data, dict):
6620
          raise errors.OpExecError("Can't get data for node %s" % nname)
6621
        remote_info = nresult.data
6622
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6623
                     'vg_size', 'vg_free', 'cpu_total']:
6624
          if attr not in remote_info:
6625
            raise errors.OpExecError("Node '%s' didn't return attribute"
6626
                                     " '%s'" % (nname, attr))
6627
          try:
6628
            remote_info[attr] = int(remote_info[attr])
6629
          except ValueError, err:
6630
            raise errors.OpExecError("Node '%s' returned invalid value"
6631
                                     " for '%s': %s" % (nname, attr, err))
6632
        # compute memory used by primary instances
6633
        i_p_mem = i_p_up_mem = 0
6634
        for iinfo, beinfo in i_list:
6635
          if iinfo.primary_node == nname:
6636
            i_p_mem += beinfo[constants.BE_MEMORY]
6637
            if iinfo.name not in node_iinfo[nname].data:
6638
              i_used_mem = 0
6639
            else:
6640
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6641
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6642
            remote_info['memory_free'] -= max(0, i_mem_diff)
6643

    
6644
            if iinfo.admin_up:
6645
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6646

    
6647
        # compute memory used by instances
6648
        pnr_dyn = {
6649
          "total_memory": remote_info['memory_total'],
6650
          "reserved_memory": remote_info['memory_dom0'],
6651
          "free_memory": remote_info['memory_free'],
6652
          "total_disk": remote_info['vg_size'],
6653
          "free_disk": remote_info['vg_free'],
6654
          "total_cpus": remote_info['cpu_total'],
6655
          "i_pri_memory": i_p_mem,
6656
          "i_pri_up_memory": i_p_up_mem,
6657
          }
6658
        pnr.update(pnr_dyn)
6659

    
6660
      node_results[nname] = pnr
6661
    data["nodes"] = node_results
6662

    
6663
    # instance data
6664
    instance_data = {}
6665
    for iinfo, beinfo in i_list:
6666
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6667
                  for n in iinfo.nics]
6668
      pir = {
6669
        "tags": list(iinfo.GetTags()),
6670
        "admin_up": iinfo.admin_up,
6671
        "vcpus": beinfo[constants.BE_VCPUS],
6672
        "memory": beinfo[constants.BE_MEMORY],
6673
        "os": iinfo.os,
6674
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6675
        "nics": nic_data,
6676
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6677
        "disk_template": iinfo.disk_template,
6678
        "hypervisor": iinfo.hypervisor,
6679
        }
6680
      instance_data[iinfo.name] = pir
6681

    
6682
    data["instances"] = instance_data
6683

    
6684
    self.in_data = data
6685

    
6686
  def _AddNewInstance(self):
6687
    """Add new instance data to allocator structure.
6688

6689
    This in combination with _AllocatorGetClusterData will create the
6690
    correct structure needed as input for the allocator.
6691

6692
    The checks for the completeness of the opcode must have already been
6693
    done.
6694

6695
    """
6696
    data = self.in_data
6697

    
6698
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6699

    
6700
    if self.disk_template in constants.DTS_NET_MIRROR:
6701
      self.required_nodes = 2
6702
    else:
6703
      self.required_nodes = 1
6704
    request = {
6705
      "type": "allocate",
6706
      "name": self.name,
6707
      "disk_template": self.disk_template,
6708
      "tags": self.tags,
6709
      "os": self.os,
6710
      "vcpus": self.vcpus,
6711
      "memory": self.mem_size,
6712
      "disks": self.disks,
6713
      "disk_space_total": disk_space,
6714
      "nics": self.nics,
6715
      "required_nodes": self.required_nodes,
6716
      }
6717
    data["request"] = request
6718

    
6719
  def _AddRelocateInstance(self):
6720
    """Add relocate instance data to allocator structure.
6721

6722
    This in combination with _IAllocatorGetClusterData will create the
6723
    correct structure needed as input for the allocator.
6724

6725
    The checks for the completeness of the opcode must have already been
6726
    done.
6727

6728
    """
6729
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6730
    if instance is None:
6731
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6732
                                   " IAllocator" % self.name)
6733

    
6734
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6735
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6736

    
6737
    if len(instance.secondary_nodes) != 1:
6738
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6739

    
6740
    self.required_nodes = 1
6741
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6742
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6743

    
6744
    request = {
6745
      "type": "relocate",
6746
      "name": self.name,
6747
      "disk_space_total": disk_space,
6748
      "required_nodes": self.required_nodes,
6749
      "relocate_from": self.relocate_from,
6750
      }
6751
    self.in_data["request"] = request
6752

    
6753
  def _BuildInputData(self):
6754
    """Build input data structures.
6755

6756
    """
6757
    self._ComputeClusterData()
6758

    
6759
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6760
      self._AddNewInstance()
6761
    else:
6762
      self._AddRelocateInstance()
6763

    
6764
    self.in_text = serializer.Dump(self.in_data)
6765

    
6766
  def Run(self, name, validate=True, call_fn=None):
6767
    """Run an instance allocator and return the results.
6768

6769
    """
6770
    if call_fn is None:
6771
      call_fn = self.lu.rpc.call_iallocator_runner
6772
    data = self.in_text
6773

    
6774
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6775
    result.Raise()
6776

    
6777
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6778
      raise errors.OpExecError("Invalid result from master iallocator runner")
6779

    
6780
    rcode, stdout, stderr, fail = result.data
6781

    
6782
    if rcode == constants.IARUN_NOTFOUND:
6783
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6784
    elif rcode == constants.IARUN_FAILURE:
6785
      raise errors.OpExecError("Instance allocator call failed: %s,"
6786
                               " output: %s" % (fail, stdout+stderr))
6787
    self.out_text = stdout
6788
    if validate:
6789
      self._ValidateResult()
6790

    
6791
  def _ValidateResult(self):
6792
    """Process the allocator results.
6793

6794
    This will process and if successful save the result in
6795
    self.out_data and the other parameters.
6796

6797
    """
6798
    try:
6799
      rdict = serializer.Load(self.out_text)
6800
    except Exception, err:
6801
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6802

    
6803
    if not isinstance(rdict, dict):
6804
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6805

    
6806
    for key in "success", "info", "nodes":
6807
      if key not in rdict:
6808
        raise errors.OpExecError("Can't parse iallocator results:"
6809
                                 " missing key '%s'" % key)
6810
      setattr(self, key, rdict[key])
6811

    
6812
    if not isinstance(rdict["nodes"], list):
6813
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6814
                               " is not a list")
6815
    self.out_data = rdict
6816

    
6817

    
6818
class LUTestAllocator(NoHooksLU):
6819
  """Run allocator tests.
6820

6821
  This LU runs the allocator tests
6822

6823
  """
6824
  _OP_REQP = ["direction", "mode", "name"]
6825

    
6826
  def CheckPrereq(self):
6827
    """Check prerequisites.
6828

6829
    This checks the opcode parameters depending on the director and mode test.
6830

6831
    """
6832
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6833
      for attr in ["name", "mem_size", "disks", "disk_template",
6834
                   "os", "tags", "nics", "vcpus"]:
6835
        if not hasattr(self.op, attr):
6836
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6837
                                     attr)
6838
      iname = self.cfg.ExpandInstanceName(self.op.name)
6839
      if iname is not None:
6840
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6841
                                   iname)
6842
      if not isinstance(self.op.nics, list):
6843
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6844
      for row in self.op.nics:
6845
        if (not isinstance(row, dict) or
6846
            "mac" not in row or
6847
            "ip" not in row or
6848
            "bridge" not in row):
6849
          raise errors.OpPrereqError("Invalid contents of the"
6850
                                     " 'nics' parameter")
6851
      if not isinstance(self.op.disks, list):
6852
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6853
      for row in self.op.disks:
6854
        if (not isinstance(row, dict) or
6855
            "size" not in row or
6856
            not isinstance(row["size"], int) or
6857
            "mode" not in row or
6858
            row["mode"] not in ['r', 'w']):
6859
          raise errors.OpPrereqError("Invalid contents of the"
6860
                                     " 'disks' parameter")
6861
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6862
        self.op.hypervisor = self.cfg.GetHypervisorType()
6863
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6864
      if not hasattr(self.op, "name"):
6865
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6866
      fname = self.cfg.ExpandInstanceName(self.op.name)
6867
      if fname is None:
6868
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6869
                                   self.op.name)
6870
      self.op.name = fname
6871
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6872
    else:
6873
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6874
                                 self.op.mode)
6875

    
6876
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6877
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6878
        raise errors.OpPrereqError("Missing allocator name")
6879
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6880
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6881
                                 self.op.direction)
6882

    
6883
  def Exec(self, feedback_fn):
6884
    """Run the allocator test.
6885

6886
    """
6887
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6888
      ial = IAllocator(self,
6889
                       mode=self.op.mode,
6890
                       name=self.op.name,
6891
                       mem_size=self.op.mem_size,
6892
                       disks=self.op.disks,
6893
                       disk_template=self.op.disk_template,
6894
                       os=self.op.os,
6895
                       tags=self.op.tags,
6896
                       nics=self.op.nics,
6897
                       vcpus=self.op.vcpus,
6898
                       hypervisor=self.op.hypervisor,
6899
                       )
6900
    else:
6901
      ial = IAllocator(self,
6902
                       mode=self.op.mode,
6903
                       name=self.op.name,
6904
                       relocate_from=list(self.relocate_from),
6905
                       )
6906

    
6907
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6908
      result = ial.in_text
6909
    else:
6910
      ial.Run(self.op.allocator, validate=False)
6911
      result = ial.out_text
6912
    return result