Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 35e994e9

History | View | Annotate | Download (240.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396
  return wanted
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the node is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _CheckNodeNotDrained(lu, node):
445
  """Ensure that a given node is not drained.
446

447
  @param lu: the LU on behalf of which we make the check
448
  @param node: the node to check
449
  @raise errors.OpPrereqError: if the node is drained
450

451
  """
452
  if lu.cfg.GetNodeInfo(node).drained:
453
    raise errors.OpPrereqError("Can't use drained node %s" % node)
454

    
455

    
456
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457
                          memory, vcpus, nics, disk_template, disks):
458
  """Builds instance related env variables for hooks
459

460
  This builds the hook environment from individual variables.
461

462
  @type name: string
463
  @param name: the name of the instance
464
  @type primary_node: string
465
  @param primary_node: the name of the instance's primary node
466
  @type secondary_nodes: list
467
  @param secondary_nodes: list of secondary nodes as strings
468
  @type os_type: string
469
  @param os_type: the name of the instance's OS
470
  @type status: boolean
471
  @param status: the should_run status of the instance
472
  @type memory: string
473
  @param memory: the memory size of the instance
474
  @type vcpus: string
475
  @param vcpus: the count of VCPUs the instance has
476
  @type nics: list
477
  @param nics: list of tuples (ip, bridge, mac) representing
478
      the NICs the instance  has
479
  @type disk_template: string
480
  @param disk_template: the distk template of the instance
481
  @type disks: list
482
  @param disks: the list of (size, mode) pairs
483
  @rtype: dict
484
  @return: the hook environment for this instance
485

486
  """
487
  if status:
488
    str_status = "up"
489
  else:
490
    str_status = "down"
491
  env = {
492
    "OP_TARGET": name,
493
    "INSTANCE_NAME": name,
494
    "INSTANCE_PRIMARY": primary_node,
495
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496
    "INSTANCE_OS_TYPE": os_type,
497
    "INSTANCE_STATUS": str_status,
498
    "INSTANCE_MEMORY": memory,
499
    "INSTANCE_VCPUS": vcpus,
500
    "INSTANCE_DISK_TEMPLATE": disk_template,
501
  }
502

    
503
  if nics:
504
    nic_count = len(nics)
505
    for idx, (ip, bridge, mac) in enumerate(nics):
506
      if ip is None:
507
        ip = ""
508
      env["INSTANCE_NIC%d_IP" % idx] = ip
509
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510
      env["INSTANCE_NIC%d_MAC" % idx] = mac
511
  else:
512
    nic_count = 0
513

    
514
  env["INSTANCE_NIC_COUNT"] = nic_count
515

    
516
  if disks:
517
    disk_count = len(disks)
518
    for idx, (size, mode) in enumerate(disks):
519
      env["INSTANCE_DISK%d_SIZE" % idx] = size
520
      env["INSTANCE_DISK%d_MODE" % idx] = mode
521
  else:
522
    disk_count = 0
523

    
524
  env["INSTANCE_DISK_COUNT"] = disk_count
525

    
526
  return env
527

    
528

    
529
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530
  """Builds instance related env variables for hooks from an object.
531

532
  @type lu: L{LogicalUnit}
533
  @param lu: the logical unit on whose behalf we execute
534
  @type instance: L{objects.Instance}
535
  @param instance: the instance for which we should build the
536
      environment
537
  @type override: dict
538
  @param override: dictionary with key/values that will override
539
      our values
540
  @rtype: dict
541
  @return: the hook environment dictionary
542

543
  """
544
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
545
  args = {
546
    'name': instance.name,
547
    'primary_node': instance.primary_node,
548
    'secondary_nodes': instance.secondary_nodes,
549
    'os_type': instance.os,
550
    'status': instance.admin_up,
551
    'memory': bep[constants.BE_MEMORY],
552
    'vcpus': bep[constants.BE_VCPUS],
553
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554
    'disk_template': instance.disk_template,
555
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
556
  }
557
  if override:
558
    args.update(override)
559
  return _BuildInstanceHookEnv(**args)
560

    
561

    
562
def _AdjustCandidatePool(lu):
563
  """Adjust the candidate pool after node operations.
564

565
  """
566
  mod_list = lu.cfg.MaintainCandidatePool()
567
  if mod_list:
568
    lu.LogInfo("Promoted nodes to master candidate role: %s",
569
               ", ".join(node.name for node in mod_list))
570
    for name in mod_list:
571
      lu.context.ReaddNode(name)
572
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573
  if mc_now > mc_max:
574
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575
               (mc_now, mc_max))
576

    
577

    
578
def _CheckInstanceBridgesExist(lu, instance):
579
  """Check that the brigdes needed by an instance exist.
580

581
  """
582
  # check bridges existance
583
  brlist = [nic.bridge for nic in instance.nics]
584
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585
  result.Raise()
586
  if not result.data:
587
    raise errors.OpPrereqError("One or more target bridges %s does not"
588
                               " exist on destination node '%s'" %
589
                               (brlist, instance.primary_node))
590

    
591

    
592
class LUDestroyCluster(NoHooksLU):
593
  """Logical unit for destroying the cluster.
594

595
  """
596
  _OP_REQP = []
597

    
598
  def CheckPrereq(self):
599
    """Check prerequisites.
600

601
    This checks whether the cluster is empty.
602

603
    Any errors are signalled by raising errors.OpPrereqError.
604

605
    """
606
    master = self.cfg.GetMasterNode()
607

    
608
    nodelist = self.cfg.GetNodeList()
609
    if len(nodelist) != 1 or nodelist[0] != master:
610
      raise errors.OpPrereqError("There are still %d node(s) in"
611
                                 " this cluster." % (len(nodelist) - 1))
612
    instancelist = self.cfg.GetInstanceList()
613
    if instancelist:
614
      raise errors.OpPrereqError("There are still %d instance(s) in"
615
                                 " this cluster." % len(instancelist))
616

    
617
  def Exec(self, feedback_fn):
618
    """Destroys the cluster.
619

620
    """
621
    master = self.cfg.GetMasterNode()
622
    result = self.rpc.call_node_stop_master(master, False)
623
    result.Raise()
624
    if not result.data:
625
      raise errors.OpExecError("Could not disable the master role")
626
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627
    utils.CreateBackup(priv_key)
628
    utils.CreateBackup(pub_key)
629
    return master
630

    
631

    
632
class LUVerifyCluster(LogicalUnit):
633
  """Verifies the cluster status.
634

635
  """
636
  HPATH = "cluster-verify"
637
  HTYPE = constants.HTYPE_CLUSTER
638
  _OP_REQP = ["skip_checks"]
639
  REQ_BGL = False
640

    
641
  def ExpandNames(self):
642
    self.needed_locks = {
643
      locking.LEVEL_NODE: locking.ALL_SET,
644
      locking.LEVEL_INSTANCE: locking.ALL_SET,
645
    }
646
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647

    
648
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649
                  node_result, feedback_fn, master_files,
650
                  drbd_map, vg_name):
651
    """Run multiple tests against a node.
652

653
    Test list:
654

655
      - compares ganeti version
656
      - checks vg existance and size > 20G
657
      - checks config file checksum
658
      - checks ssh to other nodes
659

660
    @type nodeinfo: L{objects.Node}
661
    @param nodeinfo: the node to check
662
    @param file_list: required list of files
663
    @param local_cksum: dictionary of local files and their checksums
664
    @param node_result: the results from the node
665
    @param feedback_fn: function used to accumulate results
666
    @param master_files: list of files that only masters should have
667
    @param drbd_map: the useddrbd minors for this node, in
668
        form of minor: (instance, must_exist) which correspond to instances
669
        and their running status
670
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
671

672
    """
673
    node = nodeinfo.name
674

    
675
    # main result, node_result should be a non-empty dict
676
    if not node_result or not isinstance(node_result, dict):
677
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
678
      return True
679

    
680
    # compares ganeti version
681
    local_version = constants.PROTOCOL_VERSION
682
    remote_version = node_result.get('version', None)
683
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
684
            len(remote_version) == 2):
685
      feedback_fn("  - ERROR: connection to %s failed" % (node))
686
      return True
687

    
688
    if local_version != remote_version[0]:
689
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
690
                  " node %s %s" % (local_version, node, remote_version[0]))
691
      return True
692

    
693
    # node seems compatible, we can actually try to look into its results
694

    
695
    bad = False
696

    
697
    # full package version
698
    if constants.RELEASE_VERSION != remote_version[1]:
699
      feedback_fn("  - WARNING: software version mismatch: master %s,"
700
                  " node %s %s" %
701
                  (constants.RELEASE_VERSION, node, remote_version[1]))
702

    
703
    # checks vg existence and size > 20G
704
    if vg_name is not None:
705
      vglist = node_result.get(constants.NV_VGLIST, None)
706
      if not vglist:
707
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
708
                        (node,))
709
        bad = True
710
      else:
711
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
712
                                              constants.MIN_VG_SIZE)
713
        if vgstatus:
714
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
715
          bad = True
716

    
717
    # checks config file checksum
718

    
719
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
720
    if not isinstance(remote_cksum, dict):
721
      bad = True
722
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
723
    else:
724
      for file_name in file_list:
725
        node_is_mc = nodeinfo.master_candidate
726
        must_have_file = file_name not in master_files
727
        if file_name not in remote_cksum:
728
          if node_is_mc or must_have_file:
729
            bad = True
730
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
731
        elif remote_cksum[file_name] != local_cksum[file_name]:
732
          if node_is_mc or must_have_file:
733
            bad = True
734
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
735
          else:
736
            # not candidate and this is not a must-have file
737
            bad = True
738
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
739
                        " '%s'" % file_name)
740
        else:
741
          # all good, except non-master/non-must have combination
742
          if not node_is_mc and not must_have_file:
743
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
744
                        " candidates" % file_name)
745

    
746
    # checks ssh to any
747

    
748
    if constants.NV_NODELIST not in node_result:
749
      bad = True
750
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
751
    else:
752
      if node_result[constants.NV_NODELIST]:
753
        bad = True
754
        for node in node_result[constants.NV_NODELIST]:
755
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
756
                          (node, node_result[constants.NV_NODELIST][node]))
757

    
758
    if constants.NV_NODENETTEST not in node_result:
759
      bad = True
760
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
761
    else:
762
      if node_result[constants.NV_NODENETTEST]:
763
        bad = True
764
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
765
        for node in nlist:
766
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
767
                          (node, node_result[constants.NV_NODENETTEST][node]))
768

    
769
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
770
    if isinstance(hyp_result, dict):
771
      for hv_name, hv_result in hyp_result.iteritems():
772
        if hv_result is not None:
773
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
774
                      (hv_name, hv_result))
775

    
776
    # check used drbd list
777
    if vg_name is not None:
778
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
779
      if not isinstance(used_minors, (tuple, list)):
780
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
781
                    str(used_minors))
782
      else:
783
        for minor, (iname, must_exist) in drbd_map.items():
784
          if minor not in used_minors and must_exist:
785
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
786
                        " not active" % (minor, iname))
787
            bad = True
788
        for minor in used_minors:
789
          if minor not in drbd_map:
790
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
791
                        minor)
792
            bad = True
793

    
794
    return bad
795

    
796
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
797
                      node_instance, feedback_fn, n_offline):
798
    """Verify an instance.
799

800
    This function checks to see if the required block devices are
801
    available on the instance's node.
802

803
    """
804
    bad = False
805

    
806
    node_current = instanceconfig.primary_node
807

    
808
    node_vol_should = {}
809
    instanceconfig.MapLVsByNode(node_vol_should)
810

    
811
    for node in node_vol_should:
812
      if node in n_offline:
813
        # ignore missing volumes on offline nodes
814
        continue
815
      for volume in node_vol_should[node]:
816
        if node not in node_vol_is or volume not in node_vol_is[node]:
817
          feedback_fn("  - ERROR: volume %s missing on node %s" %
818
                          (volume, node))
819
          bad = True
820

    
821
    if instanceconfig.admin_up:
822
      if ((node_current not in node_instance or
823
          not instance in node_instance[node_current]) and
824
          node_current not in n_offline):
825
        feedback_fn("  - ERROR: instance %s not running on node %s" %
826
                        (instance, node_current))
827
        bad = True
828

    
829
    for node in node_instance:
830
      if (not node == node_current):
831
        if instance in node_instance[node]:
832
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
833
                          (instance, node))
834
          bad = True
835

    
836
    return bad
837

    
838
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
839
    """Verify if there are any unknown volumes in the cluster.
840

841
    The .os, .swap and backup volumes are ignored. All other volumes are
842
    reported as unknown.
843

844
    """
845
    bad = False
846

    
847
    for node in node_vol_is:
848
      for volume in node_vol_is[node]:
849
        if node not in node_vol_should or volume not in node_vol_should[node]:
850
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
851
                      (volume, node))
852
          bad = True
853
    return bad
854

    
855
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
856
    """Verify the list of running instances.
857

858
    This checks what instances are running but unknown to the cluster.
859

860
    """
861
    bad = False
862
    for node in node_instance:
863
      for runninginstance in node_instance[node]:
864
        if runninginstance not in instancelist:
865
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
866
                          (runninginstance, node))
867
          bad = True
868
    return bad
869

    
870
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
871
    """Verify N+1 Memory Resilience.
872

873
    Check that if one single node dies we can still start all the instances it
874
    was primary for.
875

876
    """
877
    bad = False
878

    
879
    for node, nodeinfo in node_info.iteritems():
880
      # This code checks that every node which is now listed as secondary has
881
      # enough memory to host all instances it is supposed to should a single
882
      # other node in the cluster fail.
883
      # FIXME: not ready for failover to an arbitrary node
884
      # FIXME: does not support file-backed instances
885
      # WARNING: we currently take into account down instances as well as up
886
      # ones, considering that even if they're down someone might want to start
887
      # them even in the event of a node failure.
888
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
889
        needed_mem = 0
890
        for instance in instances:
891
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
892
          if bep[constants.BE_AUTO_BALANCE]:
893
            needed_mem += bep[constants.BE_MEMORY]
894
        if nodeinfo['mfree'] < needed_mem:
895
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
896
                      " failovers should node %s fail" % (node, prinode))
897
          bad = True
898
    return bad
899

    
900
  def CheckPrereq(self):
901
    """Check prerequisites.
902

903
    Transform the list of checks we're going to skip into a set and check that
904
    all its members are valid.
905

906
    """
907
    self.skip_set = frozenset(self.op.skip_checks)
908
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
909
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
910

    
911
  def BuildHooksEnv(self):
912
    """Build hooks env.
913

914
    Cluster-Verify hooks just rone in the post phase and their failure makes
915
    the output be logged in the verify output and the verification to fail.
916

917
    """
918
    all_nodes = self.cfg.GetNodeList()
919
    env = {
920
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
921
      }
922
    for node in self.cfg.GetAllNodesInfo().values():
923
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
924

    
925
    return env, [], all_nodes
926

    
927
  def Exec(self, feedback_fn):
928
    """Verify integrity of cluster, performing various test on nodes.
929

930
    """
931
    bad = False
932
    feedback_fn("* Verifying global settings")
933
    for msg in self.cfg.VerifyConfig():
934
      feedback_fn("  - ERROR: %s" % msg)
935

    
936
    vg_name = self.cfg.GetVGName()
937
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
938
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
939
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
940
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
941
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
942
                        for iname in instancelist)
943
    i_non_redundant = [] # Non redundant instances
944
    i_non_a_balanced = [] # Non auto-balanced instances
945
    n_offline = [] # List of offline nodes
946
    n_drained = [] # List of nodes being drained
947
    node_volume = {}
948
    node_instance = {}
949
    node_info = {}
950
    instance_cfg = {}
951

    
952
    # FIXME: verify OS list
953
    # do local checksums
954
    master_files = [constants.CLUSTER_CONF_FILE]
955

    
956
    file_names = ssconf.SimpleStore().GetFileList()
957
    file_names.append(constants.SSL_CERT_FILE)
958
    file_names.append(constants.RAPI_CERT_FILE)
959
    file_names.extend(master_files)
960

    
961
    local_checksums = utils.FingerprintFiles(file_names)
962

    
963
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
964
    node_verify_param = {
965
      constants.NV_FILELIST: file_names,
966
      constants.NV_NODELIST: [node.name for node in nodeinfo
967
                              if not node.offline],
968
      constants.NV_HYPERVISOR: hypervisors,
969
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
970
                                  node.secondary_ip) for node in nodeinfo
971
                                 if not node.offline],
972
      constants.NV_INSTANCELIST: hypervisors,
973
      constants.NV_VERSION: None,
974
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
975
      }
976
    if vg_name is not None:
977
      node_verify_param[constants.NV_VGLIST] = None
978
      node_verify_param[constants.NV_LVLIST] = vg_name
979
      node_verify_param[constants.NV_DRBDLIST] = None
980
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
981
                                           self.cfg.GetClusterName())
982

    
983
    cluster = self.cfg.GetClusterInfo()
984
    master_node = self.cfg.GetMasterNode()
985
    all_drbd_map = self.cfg.ComputeDRBDMap()
986

    
987
    for node_i in nodeinfo:
988
      node = node_i.name
989
      nresult = all_nvinfo[node].data
990

    
991
      if node_i.offline:
992
        feedback_fn("* Skipping offline node %s" % (node,))
993
        n_offline.append(node)
994
        continue
995

    
996
      if node == master_node:
997
        ntype = "master"
998
      elif node_i.master_candidate:
999
        ntype = "master candidate"
1000
      elif node_i.drained:
1001
        ntype = "drained"
1002
        n_drained.append(node)
1003
      else:
1004
        ntype = "regular"
1005
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1006

    
1007
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1008
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1009
        bad = True
1010
        continue
1011

    
1012
      node_drbd = {}
1013
      for minor, instance in all_drbd_map[node].items():
1014
        instance = instanceinfo[instance]
1015
        node_drbd[minor] = (instance.name, instance.admin_up)
1016
      result = self._VerifyNode(node_i, file_names, local_checksums,
1017
                                nresult, feedback_fn, master_files,
1018
                                node_drbd, vg_name)
1019
      bad = bad or result
1020

    
1021
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1022
      if vg_name is None:
1023
        node_volume[node] = {}
1024
      elif isinstance(lvdata, basestring):
1025
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1026
                    (node, utils.SafeEncode(lvdata)))
1027
        bad = True
1028
        node_volume[node] = {}
1029
      elif not isinstance(lvdata, dict):
1030
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1031
        bad = True
1032
        continue
1033
      else:
1034
        node_volume[node] = lvdata
1035

    
1036
      # node_instance
1037
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1038
      if not isinstance(idata, list):
1039
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1040
                    (node,))
1041
        bad = True
1042
        continue
1043

    
1044
      node_instance[node] = idata
1045

    
1046
      # node_info
1047
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1048
      if not isinstance(nodeinfo, dict):
1049
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1050
        bad = True
1051
        continue
1052

    
1053
      try:
1054
        node_info[node] = {
1055
          "mfree": int(nodeinfo['memory_free']),
1056
          "pinst": [],
1057
          "sinst": [],
1058
          # dictionary holding all instances this node is secondary for,
1059
          # grouped by their primary node. Each key is a cluster node, and each
1060
          # value is a list of instances which have the key as primary and the
1061
          # current node as secondary.  this is handy to calculate N+1 memory
1062
          # availability if you can only failover from a primary to its
1063
          # secondary.
1064
          "sinst-by-pnode": {},
1065
        }
1066
        # FIXME: devise a free space model for file based instances as well
1067
        if vg_name is not None:
1068
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1069
      except ValueError:
1070
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1071
        bad = True
1072
        continue
1073

    
1074
    node_vol_should = {}
1075

    
1076
    for instance in instancelist:
1077
      feedback_fn("* Verifying instance %s" % instance)
1078
      inst_config = instanceinfo[instance]
1079
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1080
                                     node_instance, feedback_fn, n_offline)
1081
      bad = bad or result
1082
      inst_nodes_offline = []
1083

    
1084
      inst_config.MapLVsByNode(node_vol_should)
1085

    
1086
      instance_cfg[instance] = inst_config
1087

    
1088
      pnode = inst_config.primary_node
1089
      if pnode in node_info:
1090
        node_info[pnode]['pinst'].append(instance)
1091
      elif pnode not in n_offline:
1092
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1093
                    " %s failed" % (instance, pnode))
1094
        bad = True
1095

    
1096
      if pnode in n_offline:
1097
        inst_nodes_offline.append(pnode)
1098

    
1099
      # If the instance is non-redundant we cannot survive losing its primary
1100
      # node, so we are not N+1 compliant. On the other hand we have no disk
1101
      # templates with more than one secondary so that situation is not well
1102
      # supported either.
1103
      # FIXME: does not support file-backed instances
1104
      if len(inst_config.secondary_nodes) == 0:
1105
        i_non_redundant.append(instance)
1106
      elif len(inst_config.secondary_nodes) > 1:
1107
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1108
                    % instance)
1109

    
1110
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1111
        i_non_a_balanced.append(instance)
1112

    
1113
      for snode in inst_config.secondary_nodes:
1114
        if snode in node_info:
1115
          node_info[snode]['sinst'].append(instance)
1116
          if pnode not in node_info[snode]['sinst-by-pnode']:
1117
            node_info[snode]['sinst-by-pnode'][pnode] = []
1118
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1119
        elif snode not in n_offline:
1120
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1121
                      " %s failed" % (instance, snode))
1122
          bad = True
1123
        if snode in n_offline:
1124
          inst_nodes_offline.append(snode)
1125

    
1126
      if inst_nodes_offline:
1127
        # warn that the instance lives on offline nodes, and set bad=True
1128
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1129
                    ", ".join(inst_nodes_offline))
1130
        bad = True
1131

    
1132
    feedback_fn("* Verifying orphan volumes")
1133
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1134
                                       feedback_fn)
1135
    bad = bad or result
1136

    
1137
    feedback_fn("* Verifying remaining instances")
1138
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1139
                                         feedback_fn)
1140
    bad = bad or result
1141

    
1142
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1143
      feedback_fn("* Verifying N+1 Memory redundancy")
1144
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1145
      bad = bad or result
1146

    
1147
    feedback_fn("* Other Notes")
1148
    if i_non_redundant:
1149
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1150
                  % len(i_non_redundant))
1151

    
1152
    if i_non_a_balanced:
1153
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1154
                  % len(i_non_a_balanced))
1155

    
1156
    if n_offline:
1157
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1158

    
1159
    if n_drained:
1160
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1161

    
1162
    return not bad
1163

    
1164
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1165
    """Analize the post-hooks' result
1166

1167
    This method analyses the hook result, handles it, and sends some
1168
    nicely-formatted feedback back to the user.
1169

1170
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1171
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1172
    @param hooks_results: the results of the multi-node hooks rpc call
1173
    @param feedback_fn: function used send feedback back to the caller
1174
    @param lu_result: previous Exec result
1175
    @return: the new Exec result, based on the previous result
1176
        and hook results
1177

1178
    """
1179
    # We only really run POST phase hooks, and are only interested in
1180
    # their results
1181
    if phase == constants.HOOKS_PHASE_POST:
1182
      # Used to change hooks' output to proper indentation
1183
      indent_re = re.compile('^', re.M)
1184
      feedback_fn("* Hooks Results")
1185
      if not hooks_results:
1186
        feedback_fn("  - ERROR: general communication failure")
1187
        lu_result = 1
1188
      else:
1189
        for node_name in hooks_results:
1190
          show_node_header = True
1191
          res = hooks_results[node_name]
1192
          if res.failed or res.data is False or not isinstance(res.data, list):
1193
            if res.offline:
1194
              # no need to warn or set fail return value
1195
              continue
1196
            feedback_fn("    Communication failure in hooks execution")
1197
            lu_result = 1
1198
            continue
1199
          for script, hkr, output in res.data:
1200
            if hkr == constants.HKR_FAIL:
1201
              # The node header is only shown once, if there are
1202
              # failing hooks on that node
1203
              if show_node_header:
1204
                feedback_fn("  Node %s:" % node_name)
1205
                show_node_header = False
1206
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1207
              output = indent_re.sub('      ', output)
1208
              feedback_fn("%s" % output)
1209
              lu_result = 1
1210

    
1211
      return lu_result
1212

    
1213

    
1214
class LUVerifyDisks(NoHooksLU):
1215
  """Verifies the cluster disks status.
1216

1217
  """
1218
  _OP_REQP = []
1219
  REQ_BGL = False
1220

    
1221
  def ExpandNames(self):
1222
    self.needed_locks = {
1223
      locking.LEVEL_NODE: locking.ALL_SET,
1224
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1225
    }
1226
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1227

    
1228
  def CheckPrereq(self):
1229
    """Check prerequisites.
1230

1231
    This has no prerequisites.
1232

1233
    """
1234
    pass
1235

    
1236
  def Exec(self, feedback_fn):
1237
    """Verify integrity of cluster disks.
1238

1239
    """
1240
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1241

    
1242
    vg_name = self.cfg.GetVGName()
1243
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1244
    instances = [self.cfg.GetInstanceInfo(name)
1245
                 for name in self.cfg.GetInstanceList()]
1246

    
1247
    nv_dict = {}
1248
    for inst in instances:
1249
      inst_lvs = {}
1250
      if (not inst.admin_up or
1251
          inst.disk_template not in constants.DTS_NET_MIRROR):
1252
        continue
1253
      inst.MapLVsByNode(inst_lvs)
1254
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1255
      for node, vol_list in inst_lvs.iteritems():
1256
        for vol in vol_list:
1257
          nv_dict[(node, vol)] = inst
1258

    
1259
    if not nv_dict:
1260
      return result
1261

    
1262
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1263

    
1264
    to_act = set()
1265
    for node in nodes:
1266
      # node_volume
1267
      lvs = node_lvs[node]
1268
      if lvs.failed:
1269
        if not lvs.offline:
1270
          self.LogWarning("Connection to node %s failed: %s" %
1271
                          (node, lvs.data))
1272
        continue
1273
      lvs = lvs.data
1274
      if isinstance(lvs, basestring):
1275
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1276
        res_nlvm[node] = lvs
1277
      elif not isinstance(lvs, dict):
1278
        logging.warning("Connection to node %s failed or invalid data"
1279
                        " returned", node)
1280
        res_nodes.append(node)
1281
        continue
1282

    
1283
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1284
        inst = nv_dict.pop((node, lv_name), None)
1285
        if (not lv_online and inst is not None
1286
            and inst.name not in res_instances):
1287
          res_instances.append(inst.name)
1288

    
1289
    # any leftover items in nv_dict are missing LVs, let's arrange the
1290
    # data better
1291
    for key, inst in nv_dict.iteritems():
1292
      if inst.name not in res_missing:
1293
        res_missing[inst.name] = []
1294
      res_missing[inst.name].append(key)
1295

    
1296
    return result
1297

    
1298

    
1299
class LURenameCluster(LogicalUnit):
1300
  """Rename the cluster.
1301

1302
  """
1303
  HPATH = "cluster-rename"
1304
  HTYPE = constants.HTYPE_CLUSTER
1305
  _OP_REQP = ["name"]
1306

    
1307
  def BuildHooksEnv(self):
1308
    """Build hooks env.
1309

1310
    """
1311
    env = {
1312
      "OP_TARGET": self.cfg.GetClusterName(),
1313
      "NEW_NAME": self.op.name,
1314
      }
1315
    mn = self.cfg.GetMasterNode()
1316
    return env, [mn], [mn]
1317

    
1318
  def CheckPrereq(self):
1319
    """Verify that the passed name is a valid one.
1320

1321
    """
1322
    hostname = utils.HostInfo(self.op.name)
1323

    
1324
    new_name = hostname.name
1325
    self.ip = new_ip = hostname.ip
1326
    old_name = self.cfg.GetClusterName()
1327
    old_ip = self.cfg.GetMasterIP()
1328
    if new_name == old_name and new_ip == old_ip:
1329
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1330
                                 " cluster has changed")
1331
    if new_ip != old_ip:
1332
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1333
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1334
                                   " reachable on the network. Aborting." %
1335
                                   new_ip)
1336

    
1337
    self.op.name = new_name
1338

    
1339
  def Exec(self, feedback_fn):
1340
    """Rename the cluster.
1341

1342
    """
1343
    clustername = self.op.name
1344
    ip = self.ip
1345

    
1346
    # shutdown the master IP
1347
    master = self.cfg.GetMasterNode()
1348
    result = self.rpc.call_node_stop_master(master, False)
1349
    if result.failed or not result.data:
1350
      raise errors.OpExecError("Could not disable the master role")
1351

    
1352
    try:
1353
      cluster = self.cfg.GetClusterInfo()
1354
      cluster.cluster_name = clustername
1355
      cluster.master_ip = ip
1356
      self.cfg.Update(cluster)
1357

    
1358
      # update the known hosts file
1359
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1360
      node_list = self.cfg.GetNodeList()
1361
      try:
1362
        node_list.remove(master)
1363
      except ValueError:
1364
        pass
1365
      result = self.rpc.call_upload_file(node_list,
1366
                                         constants.SSH_KNOWN_HOSTS_FILE)
1367
      for to_node, to_result in result.iteritems():
1368
        if to_result.failed or not to_result.data:
1369
          logging.error("Copy of file %s to node %s failed",
1370
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1371

    
1372
    finally:
1373
      result = self.rpc.call_node_start_master(master, False)
1374
      if result.failed or not result.data:
1375
        self.LogWarning("Could not re-enable the master role on"
1376
                        " the master, please restart manually.")
1377

    
1378

    
1379
def _RecursiveCheckIfLVMBased(disk):
1380
  """Check if the given disk or its children are lvm-based.
1381

1382
  @type disk: L{objects.Disk}
1383
  @param disk: the disk to check
1384
  @rtype: booleean
1385
  @return: boolean indicating whether a LD_LV dev_type was found or not
1386

1387
  """
1388
  if disk.children:
1389
    for chdisk in disk.children:
1390
      if _RecursiveCheckIfLVMBased(chdisk):
1391
        return True
1392
  return disk.dev_type == constants.LD_LV
1393

    
1394

    
1395
class LUSetClusterParams(LogicalUnit):
1396
  """Change the parameters of the cluster.
1397

1398
  """
1399
  HPATH = "cluster-modify"
1400
  HTYPE = constants.HTYPE_CLUSTER
1401
  _OP_REQP = []
1402
  REQ_BGL = False
1403

    
1404
  def CheckParameters(self):
1405
    """Check parameters
1406

1407
    """
1408
    if not hasattr(self.op, "candidate_pool_size"):
1409
      self.op.candidate_pool_size = None
1410
    if self.op.candidate_pool_size is not None:
1411
      try:
1412
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1413
      except ValueError, err:
1414
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1415
                                   str(err))
1416
      if self.op.candidate_pool_size < 1:
1417
        raise errors.OpPrereqError("At least one master candidate needed")
1418

    
1419
  def ExpandNames(self):
1420
    # FIXME: in the future maybe other cluster params won't require checking on
1421
    # all nodes to be modified.
1422
    self.needed_locks = {
1423
      locking.LEVEL_NODE: locking.ALL_SET,
1424
    }
1425
    self.share_locks[locking.LEVEL_NODE] = 1
1426

    
1427
  def BuildHooksEnv(self):
1428
    """Build hooks env.
1429

1430
    """
1431
    env = {
1432
      "OP_TARGET": self.cfg.GetClusterName(),
1433
      "NEW_VG_NAME": self.op.vg_name,
1434
      }
1435
    mn = self.cfg.GetMasterNode()
1436
    return env, [mn], [mn]
1437

    
1438
  def CheckPrereq(self):
1439
    """Check prerequisites.
1440

1441
    This checks whether the given params don't conflict and
1442
    if the given volume group is valid.
1443

1444
    """
1445
    if self.op.vg_name is not None and not self.op.vg_name:
1446
      instances = self.cfg.GetAllInstancesInfo().values()
1447
      for inst in instances:
1448
        for disk in inst.disks:
1449
          if _RecursiveCheckIfLVMBased(disk):
1450
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1451
                                       " lvm-based instances exist")
1452

    
1453
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1454

    
1455
    # if vg_name not None, checks given volume group on all nodes
1456
    if self.op.vg_name:
1457
      vglist = self.rpc.call_vg_list(node_list)
1458
      for node in node_list:
1459
        if vglist[node].failed:
1460
          # ignoring down node
1461
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1462
          continue
1463
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1464
                                              self.op.vg_name,
1465
                                              constants.MIN_VG_SIZE)
1466
        if vgstatus:
1467
          raise errors.OpPrereqError("Error on node '%s': %s" %
1468
                                     (node, vgstatus))
1469

    
1470
    self.cluster = cluster = self.cfg.GetClusterInfo()
1471
    # validate beparams changes
1472
    if self.op.beparams:
1473
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1474
      self.new_beparams = cluster.FillDict(
1475
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1476

    
1477
    # hypervisor list/parameters
1478
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1479
    if self.op.hvparams:
1480
      if not isinstance(self.op.hvparams, dict):
1481
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1482
      for hv_name, hv_dict in self.op.hvparams.items():
1483
        if hv_name not in self.new_hvparams:
1484
          self.new_hvparams[hv_name] = hv_dict
1485
        else:
1486
          self.new_hvparams[hv_name].update(hv_dict)
1487

    
1488
    if self.op.enabled_hypervisors is not None:
1489
      self.hv_list = self.op.enabled_hypervisors
1490
    else:
1491
      self.hv_list = cluster.enabled_hypervisors
1492

    
1493
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1494
      # either the enabled list has changed, or the parameters have, validate
1495
      for hv_name, hv_params in self.new_hvparams.items():
1496
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1497
            (self.op.enabled_hypervisors and
1498
             hv_name in self.op.enabled_hypervisors)):
1499
          # either this is a new hypervisor, or its parameters have changed
1500
          hv_class = hypervisor.GetHypervisor(hv_name)
1501
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1502
          hv_class.CheckParameterSyntax(hv_params)
1503
          _CheckHVParams(self, node_list, hv_name, hv_params)
1504

    
1505
  def Exec(self, feedback_fn):
1506
    """Change the parameters of the cluster.
1507

1508
    """
1509
    if self.op.vg_name is not None:
1510
      if self.op.vg_name != self.cfg.GetVGName():
1511
        self.cfg.SetVGName(self.op.vg_name)
1512
      else:
1513
        feedback_fn("Cluster LVM configuration already in desired"
1514
                    " state, not changing")
1515
    if self.op.hvparams:
1516
      self.cluster.hvparams = self.new_hvparams
1517
    if self.op.enabled_hypervisors is not None:
1518
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1519
    if self.op.beparams:
1520
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1521
    if self.op.candidate_pool_size is not None:
1522
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1523

    
1524
    self.cfg.Update(self.cluster)
1525

    
1526
    # we want to update nodes after the cluster so that if any errors
1527
    # happen, we have recorded and saved the cluster info
1528
    if self.op.candidate_pool_size is not None:
1529
      _AdjustCandidatePool(self)
1530

    
1531

    
1532
class LURedistributeConfig(NoHooksLU):
1533
  """Force the redistribution of cluster configuration.
1534

1535
  This is a very simple LU.
1536

1537
  """
1538
  _OP_REQP = []
1539
  REQ_BGL = False
1540

    
1541
  def ExpandNames(self):
1542
    self.needed_locks = {
1543
      locking.LEVEL_NODE: locking.ALL_SET,
1544
    }
1545
    self.share_locks[locking.LEVEL_NODE] = 1
1546

    
1547
  def CheckPrereq(self):
1548
    """Check prerequisites.
1549

1550
    """
1551

    
1552
  def Exec(self, feedback_fn):
1553
    """Redistribute the configuration.
1554

1555
    """
1556
    self.cfg.Update(self.cfg.GetClusterInfo())
1557

    
1558

    
1559
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1560
  """Sleep and poll for an instance's disk to sync.
1561

1562
  """
1563
  if not instance.disks:
1564
    return True
1565

    
1566
  if not oneshot:
1567
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1568

    
1569
  node = instance.primary_node
1570

    
1571
  for dev in instance.disks:
1572
    lu.cfg.SetDiskID(dev, node)
1573

    
1574
  retries = 0
1575
  while True:
1576
    max_time = 0
1577
    done = True
1578
    cumul_degraded = False
1579
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1580
    if rstats.failed or not rstats.data:
1581
      lu.LogWarning("Can't get any data from node %s", node)
1582
      retries += 1
1583
      if retries >= 10:
1584
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1585
                                 " aborting." % node)
1586
      time.sleep(6)
1587
      continue
1588
    rstats = rstats.data
1589
    retries = 0
1590
    for i, mstat in enumerate(rstats):
1591
      if mstat is None:
1592
        lu.LogWarning("Can't compute data for node %s/%s",
1593
                           node, instance.disks[i].iv_name)
1594
        continue
1595
      # we ignore the ldisk parameter
1596
      perc_done, est_time, is_degraded, _ = mstat
1597
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1598
      if perc_done is not None:
1599
        done = False
1600
        if est_time is not None:
1601
          rem_time = "%d estimated seconds remaining" % est_time
1602
          max_time = est_time
1603
        else:
1604
          rem_time = "no time estimate"
1605
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1606
                        (instance.disks[i].iv_name, perc_done, rem_time))
1607
    if done or oneshot:
1608
      break
1609

    
1610
    time.sleep(min(60, max_time))
1611

    
1612
  if done:
1613
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1614
  return not cumul_degraded
1615

    
1616

    
1617
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1618
  """Check that mirrors are not degraded.
1619

1620
  The ldisk parameter, if True, will change the test from the
1621
  is_degraded attribute (which represents overall non-ok status for
1622
  the device(s)) to the ldisk (representing the local storage status).
1623

1624
  """
1625
  lu.cfg.SetDiskID(dev, node)
1626
  if ldisk:
1627
    idx = 6
1628
  else:
1629
    idx = 5
1630

    
1631
  result = True
1632
  if on_primary or dev.AssembleOnSecondary():
1633
    rstats = lu.rpc.call_blockdev_find(node, dev)
1634
    msg = rstats.RemoteFailMsg()
1635
    if msg:
1636
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1637
      result = False
1638
    elif not rstats.payload:
1639
      lu.LogWarning("Can't find disk on node %s", node)
1640
      result = False
1641
    else:
1642
      result = result and (not rstats.payload[idx])
1643
  if dev.children:
1644
    for child in dev.children:
1645
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1646

    
1647
  return result
1648

    
1649

    
1650
class LUDiagnoseOS(NoHooksLU):
1651
  """Logical unit for OS diagnose/query.
1652

1653
  """
1654
  _OP_REQP = ["output_fields", "names"]
1655
  REQ_BGL = False
1656
  _FIELDS_STATIC = utils.FieldSet()
1657
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1658

    
1659
  def ExpandNames(self):
1660
    if self.op.names:
1661
      raise errors.OpPrereqError("Selective OS query not supported")
1662

    
1663
    _CheckOutputFields(static=self._FIELDS_STATIC,
1664
                       dynamic=self._FIELDS_DYNAMIC,
1665
                       selected=self.op.output_fields)
1666

    
1667
    # Lock all nodes, in shared mode
1668
    self.needed_locks = {}
1669
    self.share_locks[locking.LEVEL_NODE] = 1
1670
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1671

    
1672
  def CheckPrereq(self):
1673
    """Check prerequisites.
1674

1675
    """
1676

    
1677
  @staticmethod
1678
  def _DiagnoseByOS(node_list, rlist):
1679
    """Remaps a per-node return list into an a per-os per-node dictionary
1680

1681
    @param node_list: a list with the names of all nodes
1682
    @param rlist: a map with node names as keys and OS objects as values
1683

1684
    @rtype: dict
1685
    @return: a dictionary with osnames as keys and as value another map, with
1686
        nodes as keys and list of OS objects as values, eg::
1687

1688
          {"debian-etch": {"node1": [<object>,...],
1689
                           "node2": [<object>,]}
1690
          }
1691

1692
    """
1693
    all_os = {}
1694
    for node_name, nr in rlist.iteritems():
1695
      if nr.failed or not nr.data:
1696
        continue
1697
      for os_obj in nr.data:
1698
        if os_obj.name not in all_os:
1699
          # build a list of nodes for this os containing empty lists
1700
          # for each node in node_list
1701
          all_os[os_obj.name] = {}
1702
          for nname in node_list:
1703
            all_os[os_obj.name][nname] = []
1704
        all_os[os_obj.name][node_name].append(os_obj)
1705
    return all_os
1706

    
1707
  def Exec(self, feedback_fn):
1708
    """Compute the list of OSes.
1709

1710
    """
1711
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1712
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1713
                   if node in node_list]
1714
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1715
    if node_data == False:
1716
      raise errors.OpExecError("Can't gather the list of OSes")
1717
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1718
    output = []
1719
    for os_name, os_data in pol.iteritems():
1720
      row = []
1721
      for field in self.op.output_fields:
1722
        if field == "name":
1723
          val = os_name
1724
        elif field == "valid":
1725
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1726
        elif field == "node_status":
1727
          val = {}
1728
          for node_name, nos_list in os_data.iteritems():
1729
            val[node_name] = [(v.status, v.path) for v in nos_list]
1730
        else:
1731
          raise errors.ParameterError(field)
1732
        row.append(val)
1733
      output.append(row)
1734

    
1735
    return output
1736

    
1737

    
1738
class LURemoveNode(LogicalUnit):
1739
  """Logical unit for removing a node.
1740

1741
  """
1742
  HPATH = "node-remove"
1743
  HTYPE = constants.HTYPE_NODE
1744
  _OP_REQP = ["node_name"]
1745

    
1746
  def BuildHooksEnv(self):
1747
    """Build hooks env.
1748

1749
    This doesn't run on the target node in the pre phase as a failed
1750
    node would then be impossible to remove.
1751

1752
    """
1753
    env = {
1754
      "OP_TARGET": self.op.node_name,
1755
      "NODE_NAME": self.op.node_name,
1756
      }
1757
    all_nodes = self.cfg.GetNodeList()
1758
    all_nodes.remove(self.op.node_name)
1759
    return env, all_nodes, all_nodes
1760

    
1761
  def CheckPrereq(self):
1762
    """Check prerequisites.
1763

1764
    This checks:
1765
     - the node exists in the configuration
1766
     - it does not have primary or secondary instances
1767
     - it's not the master
1768

1769
    Any errors are signalled by raising errors.OpPrereqError.
1770

1771
    """
1772
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1773
    if node is None:
1774
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1775

    
1776
    instance_list = self.cfg.GetInstanceList()
1777

    
1778
    masternode = self.cfg.GetMasterNode()
1779
    if node.name == masternode:
1780
      raise errors.OpPrereqError("Node is the master node,"
1781
                                 " you need to failover first.")
1782

    
1783
    for instance_name in instance_list:
1784
      instance = self.cfg.GetInstanceInfo(instance_name)
1785
      if node.name in instance.all_nodes:
1786
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1787
                                   " please remove first." % instance_name)
1788
    self.op.node_name = node.name
1789
    self.node = node
1790

    
1791
  def Exec(self, feedback_fn):
1792
    """Removes the node from the cluster.
1793

1794
    """
1795
    node = self.node
1796
    logging.info("Stopping the node daemon and removing configs from node %s",
1797
                 node.name)
1798

    
1799
    self.context.RemoveNode(node.name)
1800

    
1801
    self.rpc.call_node_leave_cluster(node.name)
1802

    
1803
    # Promote nodes to master candidate as needed
1804
    _AdjustCandidatePool(self)
1805

    
1806

    
1807
class LUQueryNodes(NoHooksLU):
1808
  """Logical unit for querying nodes.
1809

1810
  """
1811
  _OP_REQP = ["output_fields", "names", "use_locking"]
1812
  REQ_BGL = False
1813
  _FIELDS_DYNAMIC = utils.FieldSet(
1814
    "dtotal", "dfree",
1815
    "mtotal", "mnode", "mfree",
1816
    "bootid",
1817
    "ctotal", "cnodes", "csockets",
1818
    )
1819

    
1820
  _FIELDS_STATIC = utils.FieldSet(
1821
    "name", "pinst_cnt", "sinst_cnt",
1822
    "pinst_list", "sinst_list",
1823
    "pip", "sip", "tags",
1824
    "serial_no",
1825
    "master_candidate",
1826
    "master",
1827
    "offline",
1828
    "drained",
1829
    )
1830

    
1831
  def ExpandNames(self):
1832
    _CheckOutputFields(static=self._FIELDS_STATIC,
1833
                       dynamic=self._FIELDS_DYNAMIC,
1834
                       selected=self.op.output_fields)
1835

    
1836
    self.needed_locks = {}
1837
    self.share_locks[locking.LEVEL_NODE] = 1
1838

    
1839
    if self.op.names:
1840
      self.wanted = _GetWantedNodes(self, self.op.names)
1841
    else:
1842
      self.wanted = locking.ALL_SET
1843

    
1844
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1845
    self.do_locking = self.do_node_query and self.op.use_locking
1846
    if self.do_locking:
1847
      # if we don't request only static fields, we need to lock the nodes
1848
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1849

    
1850

    
1851
  def CheckPrereq(self):
1852
    """Check prerequisites.
1853

1854
    """
1855
    # The validation of the node list is done in the _GetWantedNodes,
1856
    # if non empty, and if empty, there's no validation to do
1857
    pass
1858

    
1859
  def Exec(self, feedback_fn):
1860
    """Computes the list of nodes and their attributes.
1861

1862
    """
1863
    all_info = self.cfg.GetAllNodesInfo()
1864
    if self.do_locking:
1865
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1866
    elif self.wanted != locking.ALL_SET:
1867
      nodenames = self.wanted
1868
      missing = set(nodenames).difference(all_info.keys())
1869
      if missing:
1870
        raise errors.OpExecError(
1871
          "Some nodes were removed before retrieving their data: %s" % missing)
1872
    else:
1873
      nodenames = all_info.keys()
1874

    
1875
    nodenames = utils.NiceSort(nodenames)
1876
    nodelist = [all_info[name] for name in nodenames]
1877

    
1878
    # begin data gathering
1879

    
1880
    if self.do_node_query:
1881
      live_data = {}
1882
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1883
                                          self.cfg.GetHypervisorType())
1884
      for name in nodenames:
1885
        nodeinfo = node_data[name]
1886
        if not nodeinfo.failed and nodeinfo.data:
1887
          nodeinfo = nodeinfo.data
1888
          fn = utils.TryConvert
1889
          live_data[name] = {
1890
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1891
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1892
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1893
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1894
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1895
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1896
            "bootid": nodeinfo.get('bootid', None),
1897
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1898
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1899
            }
1900
        else:
1901
          live_data[name] = {}
1902
    else:
1903
      live_data = dict.fromkeys(nodenames, {})
1904

    
1905
    node_to_primary = dict([(name, set()) for name in nodenames])
1906
    node_to_secondary = dict([(name, set()) for name in nodenames])
1907

    
1908
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1909
                             "sinst_cnt", "sinst_list"))
1910
    if inst_fields & frozenset(self.op.output_fields):
1911
      instancelist = self.cfg.GetInstanceList()
1912

    
1913
      for instance_name in instancelist:
1914
        inst = self.cfg.GetInstanceInfo(instance_name)
1915
        if inst.primary_node in node_to_primary:
1916
          node_to_primary[inst.primary_node].add(inst.name)
1917
        for secnode in inst.secondary_nodes:
1918
          if secnode in node_to_secondary:
1919
            node_to_secondary[secnode].add(inst.name)
1920

    
1921
    master_node = self.cfg.GetMasterNode()
1922

    
1923
    # end data gathering
1924

    
1925
    output = []
1926
    for node in nodelist:
1927
      node_output = []
1928
      for field in self.op.output_fields:
1929
        if field == "name":
1930
          val = node.name
1931
        elif field == "pinst_list":
1932
          val = list(node_to_primary[node.name])
1933
        elif field == "sinst_list":
1934
          val = list(node_to_secondary[node.name])
1935
        elif field == "pinst_cnt":
1936
          val = len(node_to_primary[node.name])
1937
        elif field == "sinst_cnt":
1938
          val = len(node_to_secondary[node.name])
1939
        elif field == "pip":
1940
          val = node.primary_ip
1941
        elif field == "sip":
1942
          val = node.secondary_ip
1943
        elif field == "tags":
1944
          val = list(node.GetTags())
1945
        elif field == "serial_no":
1946
          val = node.serial_no
1947
        elif field == "master_candidate":
1948
          val = node.master_candidate
1949
        elif field == "master":
1950
          val = node.name == master_node
1951
        elif field == "offline":
1952
          val = node.offline
1953
        elif field == "drained":
1954
          val = node.drained
1955
        elif self._FIELDS_DYNAMIC.Matches(field):
1956
          val = live_data[node.name].get(field, None)
1957
        else:
1958
          raise errors.ParameterError(field)
1959
        node_output.append(val)
1960
      output.append(node_output)
1961

    
1962
    return output
1963

    
1964

    
1965
class LUQueryNodeVolumes(NoHooksLU):
1966
  """Logical unit for getting volumes on node(s).
1967

1968
  """
1969
  _OP_REQP = ["nodes", "output_fields"]
1970
  REQ_BGL = False
1971
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1972
  _FIELDS_STATIC = utils.FieldSet("node")
1973

    
1974
  def ExpandNames(self):
1975
    _CheckOutputFields(static=self._FIELDS_STATIC,
1976
                       dynamic=self._FIELDS_DYNAMIC,
1977
                       selected=self.op.output_fields)
1978

    
1979
    self.needed_locks = {}
1980
    self.share_locks[locking.LEVEL_NODE] = 1
1981
    if not self.op.nodes:
1982
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1983
    else:
1984
      self.needed_locks[locking.LEVEL_NODE] = \
1985
        _GetWantedNodes(self, self.op.nodes)
1986

    
1987
  def CheckPrereq(self):
1988
    """Check prerequisites.
1989

1990
    This checks that the fields required are valid output fields.
1991

1992
    """
1993
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1994

    
1995
  def Exec(self, feedback_fn):
1996
    """Computes the list of nodes and their attributes.
1997

1998
    """
1999
    nodenames = self.nodes
2000
    volumes = self.rpc.call_node_volumes(nodenames)
2001

    
2002
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2003
             in self.cfg.GetInstanceList()]
2004

    
2005
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2006

    
2007
    output = []
2008
    for node in nodenames:
2009
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2010
        continue
2011

    
2012
      node_vols = volumes[node].data[:]
2013
      node_vols.sort(key=lambda vol: vol['dev'])
2014

    
2015
      for vol in node_vols:
2016
        node_output = []
2017
        for field in self.op.output_fields:
2018
          if field == "node":
2019
            val = node
2020
          elif field == "phys":
2021
            val = vol['dev']
2022
          elif field == "vg":
2023
            val = vol['vg']
2024
          elif field == "name":
2025
            val = vol['name']
2026
          elif field == "size":
2027
            val = int(float(vol['size']))
2028
          elif field == "instance":
2029
            for inst in ilist:
2030
              if node not in lv_by_node[inst]:
2031
                continue
2032
              if vol['name'] in lv_by_node[inst][node]:
2033
                val = inst.name
2034
                break
2035
            else:
2036
              val = '-'
2037
          else:
2038
            raise errors.ParameterError(field)
2039
          node_output.append(str(val))
2040

    
2041
        output.append(node_output)
2042

    
2043
    return output
2044

    
2045

    
2046
class LUAddNode(LogicalUnit):
2047
  """Logical unit for adding node to the cluster.
2048

2049
  """
2050
  HPATH = "node-add"
2051
  HTYPE = constants.HTYPE_NODE
2052
  _OP_REQP = ["node_name"]
2053

    
2054
  def BuildHooksEnv(self):
2055
    """Build hooks env.
2056

2057
    This will run on all nodes before, and on all nodes + the new node after.
2058

2059
    """
2060
    env = {
2061
      "OP_TARGET": self.op.node_name,
2062
      "NODE_NAME": self.op.node_name,
2063
      "NODE_PIP": self.op.primary_ip,
2064
      "NODE_SIP": self.op.secondary_ip,
2065
      }
2066
    nodes_0 = self.cfg.GetNodeList()
2067
    nodes_1 = nodes_0 + [self.op.node_name, ]
2068
    return env, nodes_0, nodes_1
2069

    
2070
  def CheckPrereq(self):
2071
    """Check prerequisites.
2072

2073
    This checks:
2074
     - the new node is not already in the config
2075
     - it is resolvable
2076
     - its parameters (single/dual homed) matches the cluster
2077

2078
    Any errors are signalled by raising errors.OpPrereqError.
2079

2080
    """
2081
    node_name = self.op.node_name
2082
    cfg = self.cfg
2083

    
2084
    dns_data = utils.HostInfo(node_name)
2085

    
2086
    node = dns_data.name
2087
    primary_ip = self.op.primary_ip = dns_data.ip
2088
    secondary_ip = getattr(self.op, "secondary_ip", None)
2089
    if secondary_ip is None:
2090
      secondary_ip = primary_ip
2091
    if not utils.IsValidIP(secondary_ip):
2092
      raise errors.OpPrereqError("Invalid secondary IP given")
2093
    self.op.secondary_ip = secondary_ip
2094

    
2095
    node_list = cfg.GetNodeList()
2096
    if not self.op.readd and node in node_list:
2097
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2098
                                 node)
2099
    elif self.op.readd and node not in node_list:
2100
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2101

    
2102
    for existing_node_name in node_list:
2103
      existing_node = cfg.GetNodeInfo(existing_node_name)
2104

    
2105
      if self.op.readd and node == existing_node_name:
2106
        if (existing_node.primary_ip != primary_ip or
2107
            existing_node.secondary_ip != secondary_ip):
2108
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2109
                                     " address configuration as before")
2110
        continue
2111

    
2112
      if (existing_node.primary_ip == primary_ip or
2113
          existing_node.secondary_ip == primary_ip or
2114
          existing_node.primary_ip == secondary_ip or
2115
          existing_node.secondary_ip == secondary_ip):
2116
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2117
                                   " existing node %s" % existing_node.name)
2118

    
2119
    # check that the type of the node (single versus dual homed) is the
2120
    # same as for the master
2121
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2122
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2123
    newbie_singlehomed = secondary_ip == primary_ip
2124
    if master_singlehomed != newbie_singlehomed:
2125
      if master_singlehomed:
2126
        raise errors.OpPrereqError("The master has no private ip but the"
2127
                                   " new node has one")
2128
      else:
2129
        raise errors.OpPrereqError("The master has a private ip but the"
2130
                                   " new node doesn't have one")
2131

    
2132
    # checks reachablity
2133
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2134
      raise errors.OpPrereqError("Node not reachable by ping")
2135

    
2136
    if not newbie_singlehomed:
2137
      # check reachability from my secondary ip to newbie's secondary ip
2138
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2139
                           source=myself.secondary_ip):
2140
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2141
                                   " based ping to noded port")
2142

    
2143
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2144
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2145
    master_candidate = mc_now < cp_size
2146

    
2147
    self.new_node = objects.Node(name=node,
2148
                                 primary_ip=primary_ip,
2149
                                 secondary_ip=secondary_ip,
2150
                                 master_candidate=master_candidate,
2151
                                 offline=False, drained=False)
2152

    
2153
  def Exec(self, feedback_fn):
2154
    """Adds the new node to the cluster.
2155

2156
    """
2157
    new_node = self.new_node
2158
    node = new_node.name
2159

    
2160
    # check connectivity
2161
    result = self.rpc.call_version([node])[node]
2162
    result.Raise()
2163
    if result.data:
2164
      if constants.PROTOCOL_VERSION == result.data:
2165
        logging.info("Communication to node %s fine, sw version %s match",
2166
                     node, result.data)
2167
      else:
2168
        raise errors.OpExecError("Version mismatch master version %s,"
2169
                                 " node version %s" %
2170
                                 (constants.PROTOCOL_VERSION, result.data))
2171
    else:
2172
      raise errors.OpExecError("Cannot get version from the new node")
2173

    
2174
    # setup ssh on node
2175
    logging.info("Copy ssh key to node %s", node)
2176
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2177
    keyarray = []
2178
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2179
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2180
                priv_key, pub_key]
2181

    
2182
    for i in keyfiles:
2183
      f = open(i, 'r')
2184
      try:
2185
        keyarray.append(f.read())
2186
      finally:
2187
        f.close()
2188

    
2189
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2190
                                    keyarray[2],
2191
                                    keyarray[3], keyarray[4], keyarray[5])
2192

    
2193
    msg = result.RemoteFailMsg()
2194
    if msg:
2195
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2196
                               " new node: %s" % msg)
2197

    
2198
    # Add node to our /etc/hosts, and add key to known_hosts
2199
    utils.AddHostToEtcHosts(new_node.name)
2200

    
2201
    if new_node.secondary_ip != new_node.primary_ip:
2202
      result = self.rpc.call_node_has_ip_address(new_node.name,
2203
                                                 new_node.secondary_ip)
2204
      if result.failed or not result.data:
2205
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2206
                                 " you gave (%s). Please fix and re-run this"
2207
                                 " command." % new_node.secondary_ip)
2208

    
2209
    node_verify_list = [self.cfg.GetMasterNode()]
2210
    node_verify_param = {
2211
      'nodelist': [node],
2212
      # TODO: do a node-net-test as well?
2213
    }
2214

    
2215
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2216
                                       self.cfg.GetClusterName())
2217
    for verifier in node_verify_list:
2218
      if result[verifier].failed or not result[verifier].data:
2219
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2220
                                 " for remote verification" % verifier)
2221
      if result[verifier].data['nodelist']:
2222
        for failed in result[verifier].data['nodelist']:
2223
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2224
                      (verifier, result[verifier].data['nodelist'][failed]))
2225
        raise errors.OpExecError("ssh/hostname verification failed.")
2226

    
2227
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2228
    # including the node just added
2229
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2230
    dist_nodes = self.cfg.GetNodeList()
2231
    if not self.op.readd:
2232
      dist_nodes.append(node)
2233
    if myself.name in dist_nodes:
2234
      dist_nodes.remove(myself.name)
2235

    
2236
    logging.debug("Copying hosts and known_hosts to all nodes")
2237
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2238
      result = self.rpc.call_upload_file(dist_nodes, fname)
2239
      for to_node, to_result in result.iteritems():
2240
        if to_result.failed or not to_result.data:
2241
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2242

    
2243
    to_copy = []
2244
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2245
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2246
      to_copy.append(constants.VNC_PASSWORD_FILE)
2247

    
2248
    for fname in to_copy:
2249
      result = self.rpc.call_upload_file([node], fname)
2250
      if result[node].failed or not result[node]:
2251
        logging.error("Could not copy file %s to node %s", fname, node)
2252

    
2253
    if self.op.readd:
2254
      self.context.ReaddNode(new_node)
2255
    else:
2256
      self.context.AddNode(new_node)
2257

    
2258

    
2259
class LUSetNodeParams(LogicalUnit):
2260
  """Modifies the parameters of a node.
2261

2262
  """
2263
  HPATH = "node-modify"
2264
  HTYPE = constants.HTYPE_NODE
2265
  _OP_REQP = ["node_name"]
2266
  REQ_BGL = False
2267

    
2268
  def CheckArguments(self):
2269
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2270
    if node_name is None:
2271
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2272
    self.op.node_name = node_name
2273
    _CheckBooleanOpField(self.op, 'master_candidate')
2274
    _CheckBooleanOpField(self.op, 'offline')
2275
    _CheckBooleanOpField(self.op, 'drained')
2276
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2277
    if all_mods.count(None) == 3:
2278
      raise errors.OpPrereqError("Please pass at least one modification")
2279
    if all_mods.count(True) > 1:
2280
      raise errors.OpPrereqError("Can't set the node into more than one"
2281
                                 " state at the same time")
2282

    
2283
  def ExpandNames(self):
2284
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2285

    
2286
  def BuildHooksEnv(self):
2287
    """Build hooks env.
2288

2289
    This runs on the master node.
2290

2291
    """
2292
    env = {
2293
      "OP_TARGET": self.op.node_name,
2294
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2295
      "OFFLINE": str(self.op.offline),
2296
      "DRAINED": str(self.op.drained),
2297
      }
2298
    nl = [self.cfg.GetMasterNode(),
2299
          self.op.node_name]
2300
    return env, nl, nl
2301

    
2302
  def CheckPrereq(self):
2303
    """Check prerequisites.
2304

2305
    This only checks the instance list against the existing names.
2306

2307
    """
2308
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2309

    
2310
    if ((self.op.master_candidate == False or self.op.offline == True or
2311
         self.op.drained == True) and node.master_candidate):
2312
      # we will demote the node from master_candidate
2313
      if self.op.node_name == self.cfg.GetMasterNode():
2314
        raise errors.OpPrereqError("The master node has to be a"
2315
                                   " master candidate, online and not drained")
2316
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2317
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2318
      if num_candidates <= cp_size:
2319
        msg = ("Not enough master candidates (desired"
2320
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2321
        if self.op.force:
2322
          self.LogWarning(msg)
2323
        else:
2324
          raise errors.OpPrereqError(msg)
2325

    
2326
    if (self.op.master_candidate == True and
2327
        ((node.offline and not self.op.offline == False) or
2328
         (node.drained and not self.op.drained == False))):
2329
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2330
                                 " to master_candidate")
2331

    
2332
    return
2333

    
2334
  def Exec(self, feedback_fn):
2335
    """Modifies a node.
2336

2337
    """
2338
    node = self.node
2339

    
2340
    result = []
2341
    changed_mc = False
2342

    
2343
    if self.op.offline is not None:
2344
      node.offline = self.op.offline
2345
      result.append(("offline", str(self.op.offline)))
2346
      if self.op.offline == True:
2347
        if node.master_candidate:
2348
          node.master_candidate = False
2349
          changed_mc = True
2350
          result.append(("master_candidate", "auto-demotion due to offline"))
2351
        if node.drained:
2352
          node.drained = False
2353
          result.append(("drained", "clear drained status due to offline"))
2354

    
2355
    if self.op.master_candidate is not None:
2356
      node.master_candidate = self.op.master_candidate
2357
      changed_mc = True
2358
      result.append(("master_candidate", str(self.op.master_candidate)))
2359
      if self.op.master_candidate == False:
2360
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2361
        msg = rrc.RemoteFailMsg()
2362
        if msg:
2363
          self.LogWarning("Node failed to demote itself: %s" % msg)
2364

    
2365
    if self.op.drained is not None:
2366
      node.drained = self.op.drained
2367
      result.append(("drained", str(self.op.drained)))
2368
      if self.op.drained == True:
2369
        if node.master_candidate:
2370
          node.master_candidate = False
2371
          changed_mc = True
2372
          result.append(("master_candidate", "auto-demotion due to drain"))
2373
        if node.offline:
2374
          node.offline = False
2375
          result.append(("offline", "clear offline status due to drain"))
2376

    
2377
    # this will trigger configuration file update, if needed
2378
    self.cfg.Update(node)
2379
    # this will trigger job queue propagation or cleanup
2380
    if changed_mc:
2381
      self.context.ReaddNode(node)
2382

    
2383
    return result
2384

    
2385

    
2386
class LUQueryClusterInfo(NoHooksLU):
2387
  """Query cluster configuration.
2388

2389
  """
2390
  _OP_REQP = []
2391
  REQ_BGL = False
2392

    
2393
  def ExpandNames(self):
2394
    self.needed_locks = {}
2395

    
2396
  def CheckPrereq(self):
2397
    """No prerequsites needed for this LU.
2398

2399
    """
2400
    pass
2401

    
2402
  def Exec(self, feedback_fn):
2403
    """Return cluster config.
2404

2405
    """
2406
    cluster = self.cfg.GetClusterInfo()
2407
    result = {
2408
      "software_version": constants.RELEASE_VERSION,
2409
      "protocol_version": constants.PROTOCOL_VERSION,
2410
      "config_version": constants.CONFIG_VERSION,
2411
      "os_api_version": constants.OS_API_VERSION,
2412
      "export_version": constants.EXPORT_VERSION,
2413
      "architecture": (platform.architecture()[0], platform.machine()),
2414
      "name": cluster.cluster_name,
2415
      "master": cluster.master_node,
2416
      "default_hypervisor": cluster.default_hypervisor,
2417
      "enabled_hypervisors": cluster.enabled_hypervisors,
2418
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2419
                        for hypervisor in cluster.enabled_hypervisors]),
2420
      "beparams": cluster.beparams,
2421
      "candidate_pool_size": cluster.candidate_pool_size,
2422
      }
2423

    
2424
    return result
2425

    
2426

    
2427
class LUQueryConfigValues(NoHooksLU):
2428
  """Return configuration values.
2429

2430
  """
2431
  _OP_REQP = []
2432
  REQ_BGL = False
2433
  _FIELDS_DYNAMIC = utils.FieldSet()
2434
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2435

    
2436
  def ExpandNames(self):
2437
    self.needed_locks = {}
2438

    
2439
    _CheckOutputFields(static=self._FIELDS_STATIC,
2440
                       dynamic=self._FIELDS_DYNAMIC,
2441
                       selected=self.op.output_fields)
2442

    
2443
  def CheckPrereq(self):
2444
    """No prerequisites.
2445

2446
    """
2447
    pass
2448

    
2449
  def Exec(self, feedback_fn):
2450
    """Dump a representation of the cluster config to the standard output.
2451

2452
    """
2453
    values = []
2454
    for field in self.op.output_fields:
2455
      if field == "cluster_name":
2456
        entry = self.cfg.GetClusterName()
2457
      elif field == "master_node":
2458
        entry = self.cfg.GetMasterNode()
2459
      elif field == "drain_flag":
2460
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2461
      else:
2462
        raise errors.ParameterError(field)
2463
      values.append(entry)
2464
    return values
2465

    
2466

    
2467
class LUActivateInstanceDisks(NoHooksLU):
2468
  """Bring up an instance's disks.
2469

2470
  """
2471
  _OP_REQP = ["instance_name"]
2472
  REQ_BGL = False
2473

    
2474
  def ExpandNames(self):
2475
    self._ExpandAndLockInstance()
2476
    self.needed_locks[locking.LEVEL_NODE] = []
2477
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2478

    
2479
  def DeclareLocks(self, level):
2480
    if level == locking.LEVEL_NODE:
2481
      self._LockInstancesNodes()
2482

    
2483
  def CheckPrereq(self):
2484
    """Check prerequisites.
2485

2486
    This checks that the instance is in the cluster.
2487

2488
    """
2489
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2490
    assert self.instance is not None, \
2491
      "Cannot retrieve locked instance %s" % self.op.instance_name
2492
    _CheckNodeOnline(self, self.instance.primary_node)
2493

    
2494
  def Exec(self, feedback_fn):
2495
    """Activate the disks.
2496

2497
    """
2498
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2499
    if not disks_ok:
2500
      raise errors.OpExecError("Cannot activate block devices")
2501

    
2502
    return disks_info
2503

    
2504

    
2505
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2506
  """Prepare the block devices for an instance.
2507

2508
  This sets up the block devices on all nodes.
2509

2510
  @type lu: L{LogicalUnit}
2511
  @param lu: the logical unit on whose behalf we execute
2512
  @type instance: L{objects.Instance}
2513
  @param instance: the instance for whose disks we assemble
2514
  @type ignore_secondaries: boolean
2515
  @param ignore_secondaries: if true, errors on secondary nodes
2516
      won't result in an error return from the function
2517
  @return: False if the operation failed, otherwise a list of
2518
      (host, instance_visible_name, node_visible_name)
2519
      with the mapping from node devices to instance devices
2520

2521
  """
2522
  device_info = []
2523
  disks_ok = True
2524
  iname = instance.name
2525
  # With the two passes mechanism we try to reduce the window of
2526
  # opportunity for the race condition of switching DRBD to primary
2527
  # before handshaking occured, but we do not eliminate it
2528

    
2529
  # The proper fix would be to wait (with some limits) until the
2530
  # connection has been made and drbd transitions from WFConnection
2531
  # into any other network-connected state (Connected, SyncTarget,
2532
  # SyncSource, etc.)
2533

    
2534
  # 1st pass, assemble on all nodes in secondary mode
2535
  for inst_disk in instance.disks:
2536
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2537
      lu.cfg.SetDiskID(node_disk, node)
2538
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2539
      msg = result.RemoteFailMsg()
2540
      if msg:
2541
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2542
                           " (is_primary=False, pass=1): %s",
2543
                           inst_disk.iv_name, node, msg)
2544
        if not ignore_secondaries:
2545
          disks_ok = False
2546

    
2547
  # FIXME: race condition on drbd migration to primary
2548

    
2549
  # 2nd pass, do only the primary node
2550
  for inst_disk in instance.disks:
2551
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2552
      if node != instance.primary_node:
2553
        continue
2554
      lu.cfg.SetDiskID(node_disk, node)
2555
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2556
      msg = result.RemoteFailMsg()
2557
      if msg:
2558
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2559
                           " (is_primary=True, pass=2): %s",
2560
                           inst_disk.iv_name, node, msg)
2561
        disks_ok = False
2562
    device_info.append((instance.primary_node, inst_disk.iv_name,
2563
                        result.payload))
2564

    
2565
  # leave the disks configured for the primary node
2566
  # this is a workaround that would be fixed better by
2567
  # improving the logical/physical id handling
2568
  for disk in instance.disks:
2569
    lu.cfg.SetDiskID(disk, instance.primary_node)
2570

    
2571
  return disks_ok, device_info
2572

    
2573

    
2574
def _StartInstanceDisks(lu, instance, force):
2575
  """Start the disks of an instance.
2576

2577
  """
2578
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2579
                                           ignore_secondaries=force)
2580
  if not disks_ok:
2581
    _ShutdownInstanceDisks(lu, instance)
2582
    if force is not None and not force:
2583
      lu.proc.LogWarning("", hint="If the message above refers to a"
2584
                         " secondary node,"
2585
                         " you can retry the operation using '--force'.")
2586
    raise errors.OpExecError("Disk consistency error")
2587

    
2588

    
2589
class LUDeactivateInstanceDisks(NoHooksLU):
2590
  """Shutdown an instance's disks.
2591

2592
  """
2593
  _OP_REQP = ["instance_name"]
2594
  REQ_BGL = False
2595

    
2596
  def ExpandNames(self):
2597
    self._ExpandAndLockInstance()
2598
    self.needed_locks[locking.LEVEL_NODE] = []
2599
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2600

    
2601
  def DeclareLocks(self, level):
2602
    if level == locking.LEVEL_NODE:
2603
      self._LockInstancesNodes()
2604

    
2605
  def CheckPrereq(self):
2606
    """Check prerequisites.
2607

2608
    This checks that the instance is in the cluster.
2609

2610
    """
2611
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2612
    assert self.instance is not None, \
2613
      "Cannot retrieve locked instance %s" % self.op.instance_name
2614

    
2615
  def Exec(self, feedback_fn):
2616
    """Deactivate the disks
2617

2618
    """
2619
    instance = self.instance
2620
    _SafeShutdownInstanceDisks(self, instance)
2621

    
2622

    
2623
def _SafeShutdownInstanceDisks(lu, instance):
2624
  """Shutdown block devices of an instance.
2625

2626
  This function checks if an instance is running, before calling
2627
  _ShutdownInstanceDisks.
2628

2629
  """
2630
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2631
                                      [instance.hypervisor])
2632
  ins_l = ins_l[instance.primary_node]
2633
  if ins_l.failed or not isinstance(ins_l.data, list):
2634
    raise errors.OpExecError("Can't contact node '%s'" %
2635
                             instance.primary_node)
2636

    
2637
  if instance.name in ins_l.data:
2638
    raise errors.OpExecError("Instance is running, can't shutdown"
2639
                             " block devices.")
2640

    
2641
  _ShutdownInstanceDisks(lu, instance)
2642

    
2643

    
2644
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2645
  """Shutdown block devices of an instance.
2646

2647
  This does the shutdown on all nodes of the instance.
2648

2649
  If the ignore_primary is false, errors on the primary node are
2650
  ignored.
2651

2652
  """
2653
  all_result = True
2654
  for disk in instance.disks:
2655
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2656
      lu.cfg.SetDiskID(top_disk, node)
2657
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2658
      msg = result.RemoteFailMsg()
2659
      if msg:
2660
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2661
                      disk.iv_name, node, msg)
2662
        if not ignore_primary or node != instance.primary_node:
2663
          all_result = False
2664
  return all_result
2665

    
2666

    
2667
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2668
  """Checks if a node has enough free memory.
2669

2670
  This function check if a given node has the needed amount of free
2671
  memory. In case the node has less memory or we cannot get the
2672
  information from the node, this function raise an OpPrereqError
2673
  exception.
2674

2675
  @type lu: C{LogicalUnit}
2676
  @param lu: a logical unit from which we get configuration data
2677
  @type node: C{str}
2678
  @param node: the node to check
2679
  @type reason: C{str}
2680
  @param reason: string to use in the error message
2681
  @type requested: C{int}
2682
  @param requested: the amount of memory in MiB to check for
2683
  @type hypervisor_name: C{str}
2684
  @param hypervisor_name: the hypervisor to ask for memory stats
2685
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2686
      we cannot check the node
2687

2688
  """
2689
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2690
  nodeinfo[node].Raise()
2691
  free_mem = nodeinfo[node].data.get('memory_free')
2692
  if not isinstance(free_mem, int):
2693
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2694
                             " was '%s'" % (node, free_mem))
2695
  if requested > free_mem:
2696
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2697
                             " needed %s MiB, available %s MiB" %
2698
                             (node, reason, requested, free_mem))
2699

    
2700

    
2701
class LUStartupInstance(LogicalUnit):
2702
  """Starts an instance.
2703

2704
  """
2705
  HPATH = "instance-start"
2706
  HTYPE = constants.HTYPE_INSTANCE
2707
  _OP_REQP = ["instance_name", "force"]
2708
  REQ_BGL = False
2709

    
2710
  def ExpandNames(self):
2711
    self._ExpandAndLockInstance()
2712

    
2713
  def BuildHooksEnv(self):
2714
    """Build hooks env.
2715

2716
    This runs on master, primary and secondary nodes of the instance.
2717

2718
    """
2719
    env = {
2720
      "FORCE": self.op.force,
2721
      }
2722
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2723
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2724
    return env, nl, nl
2725

    
2726
  def CheckPrereq(self):
2727
    """Check prerequisites.
2728

2729
    This checks that the instance is in the cluster.
2730

2731
    """
2732
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2733
    assert self.instance is not None, \
2734
      "Cannot retrieve locked instance %s" % self.op.instance_name
2735

    
2736
    _CheckNodeOnline(self, instance.primary_node)
2737

    
2738
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2739
    # check bridges existance
2740
    _CheckInstanceBridgesExist(self, instance)
2741

    
2742
    _CheckNodeFreeMemory(self, instance.primary_node,
2743
                         "starting instance %s" % instance.name,
2744
                         bep[constants.BE_MEMORY], instance.hypervisor)
2745

    
2746
  def Exec(self, feedback_fn):
2747
    """Start the instance.
2748

2749
    """
2750
    instance = self.instance
2751
    force = self.op.force
2752

    
2753
    self.cfg.MarkInstanceUp(instance.name)
2754

    
2755
    node_current = instance.primary_node
2756

    
2757
    _StartInstanceDisks(self, instance, force)
2758

    
2759
    result = self.rpc.call_instance_start(node_current, instance)
2760
    msg = result.RemoteFailMsg()
2761
    if msg:
2762
      _ShutdownInstanceDisks(self, instance)
2763
      raise errors.OpExecError("Could not start instance: %s" % msg)
2764

    
2765

    
2766
class LURebootInstance(LogicalUnit):
2767
  """Reboot an instance.
2768

2769
  """
2770
  HPATH = "instance-reboot"
2771
  HTYPE = constants.HTYPE_INSTANCE
2772
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2773
  REQ_BGL = False
2774

    
2775
  def ExpandNames(self):
2776
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2777
                                   constants.INSTANCE_REBOOT_HARD,
2778
                                   constants.INSTANCE_REBOOT_FULL]:
2779
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2780
                                  (constants.INSTANCE_REBOOT_SOFT,
2781
                                   constants.INSTANCE_REBOOT_HARD,
2782
                                   constants.INSTANCE_REBOOT_FULL))
2783
    self._ExpandAndLockInstance()
2784

    
2785
  def BuildHooksEnv(self):
2786
    """Build hooks env.
2787

2788
    This runs on master, primary and secondary nodes of the instance.
2789

2790
    """
2791
    env = {
2792
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2793
      "REBOOT_TYPE": self.op.reboot_type,
2794
      }
2795
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2796
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2797
    return env, nl, nl
2798

    
2799
  def CheckPrereq(self):
2800
    """Check prerequisites.
2801

2802
    This checks that the instance is in the cluster.
2803

2804
    """
2805
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2806
    assert self.instance is not None, \
2807
      "Cannot retrieve locked instance %s" % self.op.instance_name
2808

    
2809
    _CheckNodeOnline(self, instance.primary_node)
2810

    
2811
    # check bridges existance
2812
    _CheckInstanceBridgesExist(self, instance)
2813

    
2814
  def Exec(self, feedback_fn):
2815
    """Reboot the instance.
2816

2817
    """
2818
    instance = self.instance
2819
    ignore_secondaries = self.op.ignore_secondaries
2820
    reboot_type = self.op.reboot_type
2821

    
2822
    node_current = instance.primary_node
2823

    
2824
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2825
                       constants.INSTANCE_REBOOT_HARD]:
2826
      for disk in instance.disks:
2827
        self.cfg.SetDiskID(disk, node_current)
2828
      result = self.rpc.call_instance_reboot(node_current, instance,
2829
                                             reboot_type)
2830
      msg = result.RemoteFailMsg()
2831
      if msg:
2832
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2833
    else:
2834
      result = self.rpc.call_instance_shutdown(node_current, instance)
2835
      msg = result.RemoteFailMsg()
2836
      if msg:
2837
        raise errors.OpExecError("Could not shutdown instance for"
2838
                                 " full reboot: %s" % msg)
2839
      _ShutdownInstanceDisks(self, instance)
2840
      _StartInstanceDisks(self, instance, ignore_secondaries)
2841
      result = self.rpc.call_instance_start(node_current, instance)
2842
      msg = result.RemoteFailMsg()
2843
      if msg:
2844
        _ShutdownInstanceDisks(self, instance)
2845
        raise errors.OpExecError("Could not start instance for"
2846
                                 " full reboot: %s" % msg)
2847

    
2848
    self.cfg.MarkInstanceUp(instance.name)
2849

    
2850

    
2851
class LUShutdownInstance(LogicalUnit):
2852
  """Shutdown an instance.
2853

2854
  """
2855
  HPATH = "instance-stop"
2856
  HTYPE = constants.HTYPE_INSTANCE
2857
  _OP_REQP = ["instance_name"]
2858
  REQ_BGL = False
2859

    
2860
  def ExpandNames(self):
2861
    self._ExpandAndLockInstance()
2862

    
2863
  def BuildHooksEnv(self):
2864
    """Build hooks env.
2865

2866
    This runs on master, primary and secondary nodes of the instance.
2867

2868
    """
2869
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2870
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2871
    return env, nl, nl
2872

    
2873
  def CheckPrereq(self):
2874
    """Check prerequisites.
2875

2876
    This checks that the instance is in the cluster.
2877

2878
    """
2879
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2880
    assert self.instance is not None, \
2881
      "Cannot retrieve locked instance %s" % self.op.instance_name
2882
    _CheckNodeOnline(self, self.instance.primary_node)
2883

    
2884
  def Exec(self, feedback_fn):
2885
    """Shutdown the instance.
2886

2887
    """
2888
    instance = self.instance
2889
    node_current = instance.primary_node
2890
    self.cfg.MarkInstanceDown(instance.name)
2891
    result = self.rpc.call_instance_shutdown(node_current, instance)
2892
    msg = result.RemoteFailMsg()
2893
    if msg:
2894
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2895

    
2896
    _ShutdownInstanceDisks(self, instance)
2897

    
2898

    
2899
class LUReinstallInstance(LogicalUnit):
2900
  """Reinstall an instance.
2901

2902
  """
2903
  HPATH = "instance-reinstall"
2904
  HTYPE = constants.HTYPE_INSTANCE
2905
  _OP_REQP = ["instance_name"]
2906
  REQ_BGL = False
2907

    
2908
  def ExpandNames(self):
2909
    self._ExpandAndLockInstance()
2910

    
2911
  def BuildHooksEnv(self):
2912
    """Build hooks env.
2913

2914
    This runs on master, primary and secondary nodes of the instance.
2915

2916
    """
2917
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2918
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2919
    return env, nl, nl
2920

    
2921
  def CheckPrereq(self):
2922
    """Check prerequisites.
2923

2924
    This checks that the instance is in the cluster and is not running.
2925

2926
    """
2927
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2928
    assert instance is not None, \
2929
      "Cannot retrieve locked instance %s" % self.op.instance_name
2930
    _CheckNodeOnline(self, instance.primary_node)
2931

    
2932
    if instance.disk_template == constants.DT_DISKLESS:
2933
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2934
                                 self.op.instance_name)
2935
    if instance.admin_up:
2936
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2937
                                 self.op.instance_name)
2938
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2939
                                              instance.name,
2940
                                              instance.hypervisor)
2941
    if remote_info.failed or remote_info.data:
2942
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2943
                                 (self.op.instance_name,
2944
                                  instance.primary_node))
2945

    
2946
    self.op.os_type = getattr(self.op, "os_type", None)
2947
    if self.op.os_type is not None:
2948
      # OS verification
2949
      pnode = self.cfg.GetNodeInfo(
2950
        self.cfg.ExpandNodeName(instance.primary_node))
2951
      if pnode is None:
2952
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2953
                                   self.op.pnode)
2954
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2955
      result.Raise()
2956
      if not isinstance(result.data, objects.OS):
2957
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2958
                                   " primary node"  % self.op.os_type)
2959

    
2960
    self.instance = instance
2961

    
2962
  def Exec(self, feedback_fn):
2963
    """Reinstall the instance.
2964

2965
    """
2966
    inst = self.instance
2967

    
2968
    if self.op.os_type is not None:
2969
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2970
      inst.os = self.op.os_type
2971
      self.cfg.Update(inst)
2972

    
2973
    _StartInstanceDisks(self, inst, None)
2974
    try:
2975
      feedback_fn("Running the instance OS create scripts...")
2976
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2977
      msg = result.RemoteFailMsg()
2978
      if msg:
2979
        raise errors.OpExecError("Could not install OS for instance %s"
2980
                                 " on node %s: %s" %
2981
                                 (inst.name, inst.primary_node, msg))
2982
    finally:
2983
      _ShutdownInstanceDisks(self, inst)
2984

    
2985

    
2986
class LURenameInstance(LogicalUnit):
2987
  """Rename an instance.
2988

2989
  """
2990
  HPATH = "instance-rename"
2991
  HTYPE = constants.HTYPE_INSTANCE
2992
  _OP_REQP = ["instance_name", "new_name"]
2993

    
2994
  def BuildHooksEnv(self):
2995
    """Build hooks env.
2996

2997
    This runs on master, primary and secondary nodes of the instance.
2998

2999
    """
3000
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3001
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3002
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3003
    return env, nl, nl
3004

    
3005
  def CheckPrereq(self):
3006
    """Check prerequisites.
3007

3008
    This checks that the instance is in the cluster and is not running.
3009

3010
    """
3011
    instance = self.cfg.GetInstanceInfo(
3012
      self.cfg.ExpandInstanceName(self.op.instance_name))
3013
    if instance is None:
3014
      raise errors.OpPrereqError("Instance '%s' not known" %
3015
                                 self.op.instance_name)
3016
    _CheckNodeOnline(self, instance.primary_node)
3017

    
3018
    if instance.admin_up:
3019
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3020
                                 self.op.instance_name)
3021
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3022
                                              instance.name,
3023
                                              instance.hypervisor)
3024
    remote_info.Raise()
3025
    if remote_info.data:
3026
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3027
                                 (self.op.instance_name,
3028
                                  instance.primary_node))
3029
    self.instance = instance
3030

    
3031
    # new name verification
3032
    name_info = utils.HostInfo(self.op.new_name)
3033

    
3034
    self.op.new_name = new_name = name_info.name
3035
    instance_list = self.cfg.GetInstanceList()
3036
    if new_name in instance_list:
3037
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3038
                                 new_name)
3039

    
3040
    if not getattr(self.op, "ignore_ip", False):
3041
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3042
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3043
                                   (name_info.ip, new_name))
3044

    
3045

    
3046
  def Exec(self, feedback_fn):
3047
    """Reinstall the instance.
3048

3049
    """
3050
    inst = self.instance
3051
    old_name = inst.name
3052

    
3053
    if inst.disk_template == constants.DT_FILE:
3054
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3055

    
3056
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3057
    # Change the instance lock. This is definitely safe while we hold the BGL
3058
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3059
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3060

    
3061
    # re-read the instance from the configuration after rename
3062
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3063

    
3064
    if inst.disk_template == constants.DT_FILE:
3065
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3066
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3067
                                                     old_file_storage_dir,
3068
                                                     new_file_storage_dir)
3069
      result.Raise()
3070
      if not result.data:
3071
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3072
                                 " directory '%s' to '%s' (but the instance"
3073
                                 " has been renamed in Ganeti)" % (
3074
                                 inst.primary_node, old_file_storage_dir,
3075
                                 new_file_storage_dir))
3076

    
3077
      if not result.data[0]:
3078
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3079
                                 " (but the instance has been renamed in"
3080
                                 " Ganeti)" % (old_file_storage_dir,
3081
                                               new_file_storage_dir))
3082

    
3083
    _StartInstanceDisks(self, inst, None)
3084
    try:
3085
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3086
                                                 old_name)
3087
      msg = result.RemoteFailMsg()
3088
      if msg:
3089
        msg = ("Could not run OS rename script for instance %s on node %s"
3090
               " (but the instance has been renamed in Ganeti): %s" %
3091
               (inst.name, inst.primary_node, msg))
3092
        self.proc.LogWarning(msg)
3093
    finally:
3094
      _ShutdownInstanceDisks(self, inst)
3095

    
3096

    
3097
class LURemoveInstance(LogicalUnit):
3098
  """Remove an instance.
3099

3100
  """
3101
  HPATH = "instance-remove"
3102
  HTYPE = constants.HTYPE_INSTANCE
3103
  _OP_REQP = ["instance_name", "ignore_failures"]
3104
  REQ_BGL = False
3105

    
3106
  def ExpandNames(self):
3107
    self._ExpandAndLockInstance()
3108
    self.needed_locks[locking.LEVEL_NODE] = []
3109
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3110

    
3111
  def DeclareLocks(self, level):
3112
    if level == locking.LEVEL_NODE:
3113
      self._LockInstancesNodes()
3114

    
3115
  def BuildHooksEnv(self):
3116
    """Build hooks env.
3117

3118
    This runs on master, primary and secondary nodes of the instance.
3119

3120
    """
3121
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3122
    nl = [self.cfg.GetMasterNode()]
3123
    return env, nl, nl
3124

    
3125
  def CheckPrereq(self):
3126
    """Check prerequisites.
3127

3128
    This checks that the instance is in the cluster.
3129

3130
    """
3131
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3132
    assert self.instance is not None, \
3133
      "Cannot retrieve locked instance %s" % self.op.instance_name
3134

    
3135
  def Exec(self, feedback_fn):
3136
    """Remove the instance.
3137

3138
    """
3139
    instance = self.instance
3140
    logging.info("Shutting down instance %s on node %s",
3141
                 instance.name, instance.primary_node)
3142

    
3143
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3144
    msg = result.RemoteFailMsg()
3145
    if msg:
3146
      if self.op.ignore_failures:
3147
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3148
      else:
3149
        raise errors.OpExecError("Could not shutdown instance %s on"
3150
                                 " node %s: %s" %
3151
                                 (instance.name, instance.primary_node, msg))
3152

    
3153
    logging.info("Removing block devices for instance %s", instance.name)
3154

    
3155
    if not _RemoveDisks(self, instance):
3156
      if self.op.ignore_failures:
3157
        feedback_fn("Warning: can't remove instance's disks")
3158
      else:
3159
        raise errors.OpExecError("Can't remove instance's disks")
3160

    
3161
    logging.info("Removing instance %s out of cluster config", instance.name)
3162

    
3163
    self.cfg.RemoveInstance(instance.name)
3164
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3165

    
3166

    
3167
class LUQueryInstances(NoHooksLU):
3168
  """Logical unit for querying instances.
3169

3170
  """
3171
  _OP_REQP = ["output_fields", "names", "use_locking"]
3172
  REQ_BGL = False
3173
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3174
                                    "admin_state",
3175
                                    "disk_template", "ip", "mac", "bridge",
3176
                                    "sda_size", "sdb_size", "vcpus", "tags",
3177
                                    "network_port", "beparams",
3178
                                    r"(disk)\.(size)/([0-9]+)",
3179
                                    r"(disk)\.(sizes)", "disk_usage",
3180
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3181
                                    r"(nic)\.(macs|ips|bridges)",
3182
                                    r"(disk|nic)\.(count)",
3183
                                    "serial_no", "hypervisor", "hvparams",] +
3184
                                  ["hv/%s" % name
3185
                                   for name in constants.HVS_PARAMETERS] +
3186
                                  ["be/%s" % name
3187
                                   for name in constants.BES_PARAMETERS])
3188
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3189

    
3190

    
3191
  def ExpandNames(self):
3192
    _CheckOutputFields(static=self._FIELDS_STATIC,
3193
                       dynamic=self._FIELDS_DYNAMIC,
3194
                       selected=self.op.output_fields)
3195

    
3196
    self.needed_locks = {}
3197
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3198
    self.share_locks[locking.LEVEL_NODE] = 1
3199

    
3200
    if self.op.names:
3201
      self.wanted = _GetWantedInstances(self, self.op.names)
3202
    else:
3203
      self.wanted = locking.ALL_SET
3204

    
3205
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3206
    self.do_locking = self.do_node_query and self.op.use_locking
3207
    if self.do_locking:
3208
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3209
      self.needed_locks[locking.LEVEL_NODE] = []
3210
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3211

    
3212
  def DeclareLocks(self, level):
3213
    if level == locking.LEVEL_NODE and self.do_locking:
3214
      self._LockInstancesNodes()
3215

    
3216
  def CheckPrereq(self):
3217
    """Check prerequisites.
3218

3219
    """
3220
    pass
3221

    
3222
  def Exec(self, feedback_fn):
3223
    """Computes the list of nodes and their attributes.
3224

3225
    """
3226
    all_info = self.cfg.GetAllInstancesInfo()
3227
    if self.wanted == locking.ALL_SET:
3228
      # caller didn't specify instance names, so ordering is not important
3229
      if self.do_locking:
3230
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3231
      else:
3232
        instance_names = all_info.keys()
3233
      instance_names = utils.NiceSort(instance_names)
3234
    else:
3235
      # caller did specify names, so we must keep the ordering
3236
      if self.do_locking:
3237
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3238
      else:
3239
        tgt_set = all_info.keys()
3240
      missing = set(self.wanted).difference(tgt_set)
3241
      if missing:
3242
        raise errors.OpExecError("Some instances were removed before"
3243
                                 " retrieving their data: %s" % missing)
3244
      instance_names = self.wanted
3245

    
3246
    instance_list = [all_info[iname] for iname in instance_names]
3247

    
3248
    # begin data gathering
3249

    
3250
    nodes = frozenset([inst.primary_node for inst in instance_list])
3251
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3252

    
3253
    bad_nodes = []
3254
    off_nodes = []
3255
    if self.do_node_query:
3256
      live_data = {}
3257
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3258
      for name in nodes:
3259
        result = node_data[name]
3260
        if result.offline:
3261
          # offline nodes will be in both lists
3262
          off_nodes.append(name)
3263
        if result.failed:
3264
          bad_nodes.append(name)
3265
        else:
3266
          if result.data:
3267
            live_data.update(result.data)
3268
            # else no instance is alive
3269
    else:
3270
      live_data = dict([(name, {}) for name in instance_names])
3271

    
3272
    # end data gathering
3273

    
3274
    HVPREFIX = "hv/"
3275
    BEPREFIX = "be/"
3276
    output = []
3277
    for instance in instance_list:
3278
      iout = []
3279
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3280
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3281
      for field in self.op.output_fields:
3282
        st_match = self._FIELDS_STATIC.Matches(field)
3283
        if field == "name":
3284
          val = instance.name
3285
        elif field == "os":
3286
          val = instance.os
3287
        elif field == "pnode":
3288
          val = instance.primary_node
3289
        elif field == "snodes":
3290
          val = list(instance.secondary_nodes)
3291
        elif field == "admin_state":
3292
          val = instance.admin_up
3293
        elif field == "oper_state":
3294
          if instance.primary_node in bad_nodes:
3295
            val = None
3296
          else:
3297
            val = bool(live_data.get(instance.name))
3298
        elif field == "status":
3299
          if instance.primary_node in off_nodes:
3300
            val = "ERROR_nodeoffline"
3301
          elif instance.primary_node in bad_nodes:
3302
            val = "ERROR_nodedown"
3303
          else:
3304
            running = bool(live_data.get(instance.name))
3305
            if running:
3306
              if instance.admin_up:
3307
                val = "running"
3308
              else:
3309
                val = "ERROR_up"
3310
            else:
3311
              if instance.admin_up:
3312
                val = "ERROR_down"
3313
              else:
3314
                val = "ADMIN_down"
3315
        elif field == "oper_ram":
3316
          if instance.primary_node in bad_nodes:
3317
            val = None
3318
          elif instance.name in live_data:
3319
            val = live_data[instance.name].get("memory", "?")
3320
          else:
3321
            val = "-"
3322
        elif field == "disk_template":
3323
          val = instance.disk_template
3324
        elif field == "ip":
3325
          val = instance.nics[0].ip
3326
        elif field == "bridge":
3327
          val = instance.nics[0].bridge
3328
        elif field == "mac":
3329
          val = instance.nics[0].mac
3330
        elif field == "sda_size" or field == "sdb_size":
3331
          idx = ord(field[2]) - ord('a')
3332
          try:
3333
            val = instance.FindDisk(idx).size
3334
          except errors.OpPrereqError:
3335
            val = None
3336
        elif field == "disk_usage": # total disk usage per node
3337
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3338
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3339
        elif field == "tags":
3340
          val = list(instance.GetTags())
3341
        elif field == "serial_no":
3342
          val = instance.serial_no
3343
        elif field == "network_port":
3344
          val = instance.network_port
3345
        elif field == "hypervisor":
3346
          val = instance.hypervisor
3347
        elif field == "hvparams":
3348
          val = i_hv
3349
        elif (field.startswith(HVPREFIX) and
3350
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3351
          val = i_hv.get(field[len(HVPREFIX):], None)
3352
        elif field == "beparams":
3353
          val = i_be
3354
        elif (field.startswith(BEPREFIX) and
3355
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3356
          val = i_be.get(field[len(BEPREFIX):], None)
3357
        elif st_match and st_match.groups():
3358
          # matches a variable list
3359
          st_groups = st_match.groups()
3360
          if st_groups and st_groups[0] == "disk":
3361
            if st_groups[1] == "count":
3362
              val = len(instance.disks)
3363
            elif st_groups[1] == "sizes":
3364
              val = [disk.size for disk in instance.disks]
3365
            elif st_groups[1] == "size":
3366
              try:
3367
                val = instance.FindDisk(st_groups[2]).size
3368
              except errors.OpPrereqError:
3369
                val = None
3370
            else:
3371
              assert False, "Unhandled disk parameter"
3372
          elif st_groups[0] == "nic":
3373
            if st_groups[1] == "count":
3374
              val = len(instance.nics)
3375
            elif st_groups[1] == "macs":
3376
              val = [nic.mac for nic in instance.nics]
3377
            elif st_groups[1] == "ips":
3378
              val = [nic.ip for nic in instance.nics]
3379
            elif st_groups[1] == "bridges":
3380
              val = [nic.bridge for nic in instance.nics]
3381
            else:
3382
              # index-based item
3383
              nic_idx = int(st_groups[2])
3384
              if nic_idx >= len(instance.nics):
3385
                val = None
3386
              else:
3387
                if st_groups[1] == "mac":
3388
                  val = instance.nics[nic_idx].mac
3389
                elif st_groups[1] == "ip":
3390
                  val = instance.nics[nic_idx].ip
3391
                elif st_groups[1] == "bridge":
3392
                  val = instance.nics[nic_idx].bridge
3393
                else:
3394
                  assert False, "Unhandled NIC parameter"
3395
          else:
3396
            assert False, "Unhandled variable parameter"
3397
        else:
3398
          raise errors.ParameterError(field)
3399
        iout.append(val)
3400
      output.append(iout)
3401

    
3402
    return output
3403

    
3404

    
3405
class LUFailoverInstance(LogicalUnit):
3406
  """Failover an instance.
3407

3408
  """
3409
  HPATH = "instance-failover"
3410
  HTYPE = constants.HTYPE_INSTANCE
3411
  _OP_REQP = ["instance_name", "ignore_consistency"]
3412
  REQ_BGL = False
3413

    
3414
  def ExpandNames(self):
3415
    self._ExpandAndLockInstance()
3416
    self.needed_locks[locking.LEVEL_NODE] = []
3417
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3418

    
3419
  def DeclareLocks(self, level):
3420
    if level == locking.LEVEL_NODE:
3421
      self._LockInstancesNodes()
3422

    
3423
  def BuildHooksEnv(self):
3424
    """Build hooks env.
3425

3426
    This runs on master, primary and secondary nodes of the instance.
3427

3428
    """
3429
    env = {
3430
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3431
      }
3432
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3433
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3434
    return env, nl, nl
3435

    
3436
  def CheckPrereq(self):
3437
    """Check prerequisites.
3438

3439
    This checks that the instance is in the cluster.
3440

3441
    """
3442
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3443
    assert self.instance is not None, \
3444
      "Cannot retrieve locked instance %s" % self.op.instance_name
3445

    
3446
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3447
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3448
      raise errors.OpPrereqError("Instance's disk layout is not"
3449
                                 " network mirrored, cannot failover.")
3450

    
3451
    secondary_nodes = instance.secondary_nodes
3452
    if not secondary_nodes:
3453
      raise errors.ProgrammerError("no secondary node but using "
3454
                                   "a mirrored disk template")
3455

    
3456
    target_node = secondary_nodes[0]
3457
    _CheckNodeOnline(self, target_node)
3458
    _CheckNodeNotDrained(self, target_node)
3459
    # check memory requirements on the secondary node
3460
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3461
                         instance.name, bep[constants.BE_MEMORY],
3462
                         instance.hypervisor)
3463

    
3464
    # check bridge existance
3465
    brlist = [nic.bridge for nic in instance.nics]
3466
    result = self.rpc.call_bridges_exist(target_node, brlist)
3467
    result.Raise()
3468
    if not result.data:
3469
      raise errors.OpPrereqError("One or more target bridges %s does not"
3470
                                 " exist on destination node '%s'" %
3471
                                 (brlist, target_node))
3472

    
3473
  def Exec(self, feedback_fn):
3474
    """Failover an instance.
3475

3476
    The failover is done by shutting it down on its present node and
3477
    starting it on the secondary.
3478

3479
    """
3480
    instance = self.instance
3481

    
3482
    source_node = instance.primary_node
3483
    target_node = instance.secondary_nodes[0]
3484

    
3485
    feedback_fn("* checking disk consistency between source and target")
3486
    for dev in instance.disks:
3487
      # for drbd, these are drbd over lvm
3488
      if not _CheckDiskConsistency(self, dev, target_node, False):
3489
        if instance.admin_up and not self.op.ignore_consistency:
3490
          raise errors.OpExecError("Disk %s is degraded on target node,"
3491
                                   " aborting failover." % dev.iv_name)
3492

    
3493
    feedback_fn("* shutting down instance on source node")
3494
    logging.info("Shutting down instance %s on node %s",
3495
                 instance.name, source_node)
3496

    
3497
    result = self.rpc.call_instance_shutdown(source_node, instance)
3498
    msg = result.RemoteFailMsg()
3499
    if msg:
3500
      if self.op.ignore_consistency:
3501
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3502
                             " Proceeding anyway. Please make sure node"
3503
                             " %s is down. Error details: %s",
3504
                             instance.name, source_node, source_node, msg)
3505
      else:
3506
        raise errors.OpExecError("Could not shutdown instance %s on"
3507
                                 " node %s: %s" %
3508
                                 (instance.name, source_node, msg))
3509

    
3510
    feedback_fn("* deactivating the instance's disks on source node")
3511
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3512
      raise errors.OpExecError("Can't shut down the instance's disks.")
3513

    
3514
    instance.primary_node = target_node
3515
    # distribute new instance config to the other nodes
3516
    self.cfg.Update(instance)
3517

    
3518
    # Only start the instance if it's marked as up
3519
    if instance.admin_up:
3520
      feedback_fn("* activating the instance's disks on target node")
3521
      logging.info("Starting instance %s on node %s",
3522
                   instance.name, target_node)
3523

    
3524
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3525
                                               ignore_secondaries=True)
3526
      if not disks_ok:
3527
        _ShutdownInstanceDisks(self, instance)
3528
        raise errors.OpExecError("Can't activate the instance's disks")
3529

    
3530
      feedback_fn("* starting the instance on the target node")
3531
      result = self.rpc.call_instance_start(target_node, instance)
3532
      msg = result.RemoteFailMsg()
3533
      if msg:
3534
        _ShutdownInstanceDisks(self, instance)
3535
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3536
                                 (instance.name, target_node, msg))
3537

    
3538

    
3539
class LUMigrateInstance(LogicalUnit):
3540
  """Migrate an instance.
3541

3542
  This is migration without shutting down, compared to the failover,
3543
  which is done with shutdown.
3544

3545
  """
3546
  HPATH = "instance-migrate"
3547
  HTYPE = constants.HTYPE_INSTANCE
3548
  _OP_REQP = ["instance_name", "live", "cleanup"]
3549

    
3550
  REQ_BGL = False
3551

    
3552
  def ExpandNames(self):
3553
    self._ExpandAndLockInstance()
3554
    self.needed_locks[locking.LEVEL_NODE] = []
3555
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3556

    
3557
  def DeclareLocks(self, level):
3558
    if level == locking.LEVEL_NODE:
3559
      self._LockInstancesNodes()
3560

    
3561
  def BuildHooksEnv(self):
3562
    """Build hooks env.
3563

3564
    This runs on master, primary and secondary nodes of the instance.
3565

3566
    """
3567
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3568
    env["MIGRATE_LIVE"] = self.op.live
3569
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3570
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3571
    return env, nl, nl
3572

    
3573
  def CheckPrereq(self):
3574
    """Check prerequisites.
3575

3576
    This checks that the instance is in the cluster.
3577

3578
    """
3579
    instance = self.cfg.GetInstanceInfo(
3580
      self.cfg.ExpandInstanceName(self.op.instance_name))
3581
    if instance is None:
3582
      raise errors.OpPrereqError("Instance '%s' not known" %
3583
                                 self.op.instance_name)
3584

    
3585
    if instance.disk_template != constants.DT_DRBD8:
3586
      raise errors.OpPrereqError("Instance's disk layout is not"
3587
                                 " drbd8, cannot migrate.")
3588

    
3589
    secondary_nodes = instance.secondary_nodes
3590
    if not secondary_nodes:
3591
      raise errors.ConfigurationError("No secondary node but using"
3592
                                      " drbd8 disk template")
3593

    
3594
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3595

    
3596
    target_node = secondary_nodes[0]
3597
    # check memory requirements on the secondary node
3598
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3599
                         instance.name, i_be[constants.BE_MEMORY],
3600
                         instance.hypervisor)
3601

    
3602
    # check bridge existance
3603
    brlist = [nic.bridge for nic in instance.nics]
3604
    result = self.rpc.call_bridges_exist(target_node, brlist)
3605
    if result.failed or not result.data:
3606
      raise errors.OpPrereqError("One or more target bridges %s does not"
3607
                                 " exist on destination node '%s'" %
3608
                                 (brlist, target_node))
3609

    
3610
    if not self.op.cleanup:
3611
      _CheckNodeNotDrained(self, target_node)
3612
      result = self.rpc.call_instance_migratable(instance.primary_node,
3613
                                                 instance)
3614
      msg = result.RemoteFailMsg()
3615
      if msg:
3616
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3617
                                   msg)
3618

    
3619
    self.instance = instance
3620

    
3621
  def _WaitUntilSync(self):
3622
    """Poll with custom rpc for disk sync.
3623

3624
    This uses our own step-based rpc call.
3625

3626
    """
3627
    self.feedback_fn("* wait until resync is done")
3628
    all_done = False
3629
    while not all_done:
3630
      all_done = True
3631
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3632
                                            self.nodes_ip,
3633
                                            self.instance.disks)
3634
      min_percent = 100
3635
      for node, nres in result.items():
3636
        msg = nres.RemoteFailMsg()
3637
        if msg:
3638
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3639
                                   (node, msg))
3640
        node_done, node_percent = nres.payload
3641
        all_done = all_done and node_done
3642
        if node_percent is not None:
3643
          min_percent = min(min_percent, node_percent)
3644
      if not all_done:
3645
        if min_percent < 100:
3646
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3647
        time.sleep(2)
3648

    
3649
  def _EnsureSecondary(self, node):
3650
    """Demote a node to secondary.
3651

3652
    """
3653
    self.feedback_fn("* switching node %s to secondary mode" % node)
3654

    
3655
    for dev in self.instance.disks:
3656
      self.cfg.SetDiskID(dev, node)
3657

    
3658
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3659
                                          self.instance.disks)
3660
    msg = result.RemoteFailMsg()
3661
    if msg:
3662
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3663
                               " error %s" % (node, msg))
3664

    
3665
  def _GoStandalone(self):
3666
    """Disconnect from the network.
3667

3668
    """
3669
    self.feedback_fn("* changing into standalone mode")
3670
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3671
                                               self.instance.disks)
3672
    for node, nres in result.items():
3673
      msg = nres.RemoteFailMsg()
3674
      if msg:
3675
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3676
                                 " error %s" % (node, msg))
3677

    
3678
  def _GoReconnect(self, multimaster):
3679
    """Reconnect to the network.
3680

3681
    """
3682
    if multimaster:
3683
      msg = "dual-master"
3684
    else:
3685
      msg = "single-master"
3686
    self.feedback_fn("* changing disks into %s mode" % msg)
3687
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3688
                                           self.instance.disks,
3689
                                           self.instance.name, multimaster)
3690
    for node, nres in result.items():
3691
      msg = nres.RemoteFailMsg()
3692
      if msg:
3693
        raise errors.OpExecError("Cannot change disks config on node %s,"
3694
                                 " error: %s" % (node, msg))
3695

    
3696
  def _ExecCleanup(self):
3697
    """Try to cleanup after a failed migration.
3698

3699
    The cleanup is done by:
3700
      - check that the instance is running only on one node
3701
        (and update the config if needed)
3702
      - change disks on its secondary node to secondary
3703
      - wait until disks are fully synchronized
3704
      - disconnect from the network
3705
      - change disks into single-master mode
3706
      - wait again until disks are fully synchronized
3707

3708
    """
3709
    instance = self.instance
3710
    target_node = self.target_node
3711
    source_node = self.source_node
3712

    
3713
    # check running on only one node
3714
    self.feedback_fn("* checking where the instance actually runs"
3715
                     " (if this hangs, the hypervisor might be in"
3716
                     " a bad state)")
3717
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3718
    for node, result in ins_l.items():
3719
      result.Raise()
3720
      if not isinstance(result.data, list):
3721
        raise errors.OpExecError("Can't contact node '%s'" % node)
3722

    
3723
    runningon_source = instance.name in ins_l[source_node].data
3724
    runningon_target = instance.name in ins_l[target_node].data
3725

    
3726
    if runningon_source and runningon_target:
3727
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3728
                               " or the hypervisor is confused. You will have"
3729
                               " to ensure manually that it runs only on one"
3730
                               " and restart this operation.")
3731

    
3732
    if not (runningon_source or runningon_target):
3733
      raise errors.OpExecError("Instance does not seem to be running at all."
3734
                               " In this case, it's safer to repair by"
3735
                               " running 'gnt-instance stop' to ensure disk"
3736
                               " shutdown, and then restarting it.")
3737

    
3738
    if runningon_target:
3739
      # the migration has actually succeeded, we need to update the config
3740
      self.feedback_fn("* instance running on secondary node (%s),"
3741
                       " updating config" % target_node)
3742
      instance.primary_node = target_node
3743
      self.cfg.Update(instance)
3744
      demoted_node = source_node
3745
    else:
3746
      self.feedback_fn("* instance confirmed to be running on its"
3747
                       " primary node (%s)" % source_node)
3748
      demoted_node = target_node
3749

    
3750
    self._EnsureSecondary(demoted_node)
3751
    try:
3752
      self._WaitUntilSync()
3753
    except errors.OpExecError:
3754
      # we ignore here errors, since if the device is standalone, it
3755
      # won't be able to sync
3756
      pass
3757
    self._GoStandalone()
3758
    self._GoReconnect(False)
3759
    self._WaitUntilSync()
3760

    
3761
    self.feedback_fn("* done")
3762

    
3763
  def _RevertDiskStatus(self):
3764
    """Try to revert the disk status after a failed migration.
3765

3766
    """
3767
    target_node = self.target_node
3768
    try:
3769
      self._EnsureSecondary(target_node)
3770
      self._GoStandalone()
3771
      self._GoReconnect(False)
3772
      self._WaitUntilSync()
3773
    except errors.OpExecError, err:
3774
      self.LogWarning("Migration failed and I can't reconnect the"
3775
                      " drives: error '%s'\n"
3776
                      "Please look and recover the instance status" %
3777
                      str(err))
3778

    
3779
  def _AbortMigration(self):
3780
    """Call the hypervisor code to abort a started migration.
3781

3782
    """
3783
    instance = self.instance
3784
    target_node = self.target_node
3785
    migration_info = self.migration_info
3786

    
3787
    abort_result = self.rpc.call_finalize_migration(target_node,
3788
                                                    instance,
3789
                                                    migration_info,
3790
                                                    False)
3791
    abort_msg = abort_result.RemoteFailMsg()
3792
    if abort_msg:
3793
      logging.error("Aborting migration failed on target node %s: %s" %
3794
                    (target_node, abort_msg))
3795
      # Don't raise an exception here, as we stil have to try to revert the
3796
      # disk status, even if this step failed.
3797

    
3798
  def _ExecMigration(self):
3799
    """Migrate an instance.
3800

3801
    The migrate is done by:
3802
      - change the disks into dual-master mode
3803
      - wait until disks are fully synchronized again
3804
      - migrate the instance
3805
      - change disks on the new secondary node (the old primary) to secondary
3806
      - wait until disks are fully synchronized
3807
      - change disks into single-master mode
3808

3809
    """
3810
    instance = self.instance
3811
    target_node = self.target_node
3812
    source_node = self.source_node
3813

    
3814
    self.feedback_fn("* checking disk consistency between source and target")
3815
    for dev in instance.disks:
3816
      if not _CheckDiskConsistency(self, dev, target_node, False):
3817
        raise errors.OpExecError("Disk %s is degraded or not fully"
3818
                                 " synchronized on target node,"
3819
                                 " aborting migrate." % dev.iv_name)
3820

    
3821
    # First get the migration information from the remote node
3822
    result = self.rpc.call_migration_info(source_node, instance)
3823
    msg = result.RemoteFailMsg()
3824
    if msg:
3825
      log_err = ("Failed fetching source migration information from %s: %s" %
3826
                 (source_node, msg))
3827
      logging.error(log_err)
3828
      raise errors.OpExecError(log_err)
3829

    
3830
    self.migration_info = migration_info = result.payload
3831

    
3832
    # Then switch the disks to master/master mode
3833
    self._EnsureSecondary(target_node)
3834
    self._GoStandalone()
3835
    self._GoReconnect(True)
3836
    self._WaitUntilSync()
3837

    
3838
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3839
    result = self.rpc.call_accept_instance(target_node,
3840
                                           instance,
3841
                                           migration_info,
3842
                                           self.nodes_ip[target_node])
3843

    
3844
    msg = result.RemoteFailMsg()
3845
    if msg:
3846
      logging.error("Instance pre-migration failed, trying to revert"
3847
                    " disk status: %s", msg)
3848
      self._AbortMigration()
3849
      self._RevertDiskStatus()
3850
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3851
                               (instance.name, msg))
3852

    
3853
    self.feedback_fn("* migrating instance to %s" % target_node)
3854
    time.sleep(10)
3855
    result = self.rpc.call_instance_migrate(source_node, instance,
3856
                                            self.nodes_ip[target_node],
3857
                                            self.op.live)
3858
    msg = result.RemoteFailMsg()
3859
    if msg:
3860
      logging.error("Instance migration failed, trying to revert"
3861
                    " disk status: %s", msg)
3862
      self._AbortMigration()
3863
      self._RevertDiskStatus()
3864
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3865
                               (instance.name, msg))
3866
    time.sleep(10)
3867

    
3868
    instance.primary_node = target_node
3869
    # distribute new instance config to the other nodes
3870
    self.cfg.Update(instance)
3871

    
3872
    result = self.rpc.call_finalize_migration(target_node,
3873
                                              instance,
3874
                                              migration_info,
3875
                                              True)
3876
    msg = result.RemoteFailMsg()
3877
    if msg:
3878
      logging.error("Instance migration succeeded, but finalization failed:"
3879
                    " %s" % msg)
3880
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3881
                               msg)
3882

    
3883
    self._EnsureSecondary(source_node)
3884
    self._WaitUntilSync()
3885
    self._GoStandalone()
3886
    self._GoReconnect(False)
3887
    self._WaitUntilSync()
3888

    
3889
    self.feedback_fn("* done")
3890

    
3891
  def Exec(self, feedback_fn):
3892
    """Perform the migration.
3893

3894
    """
3895
    self.feedback_fn = feedback_fn
3896

    
3897
    self.source_node = self.instance.primary_node
3898
    self.target_node = self.instance.secondary_nodes[0]
3899
    self.all_nodes = [self.source_node, self.target_node]
3900
    self.nodes_ip = {
3901
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3902
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3903
      }
3904
    if self.op.cleanup:
3905
      return self._ExecCleanup()
3906
    else:
3907
      return self._ExecMigration()
3908

    
3909

    
3910
def _CreateBlockDev(lu, node, instance, device, force_create,
3911
                    info, force_open):
3912
  """Create a tree of block devices on a given node.
3913

3914
  If this device type has to be created on secondaries, create it and
3915
  all its children.
3916

3917
  If not, just recurse to children keeping the same 'force' value.
3918

3919
  @param lu: the lu on whose behalf we execute
3920
  @param node: the node on which to create the device
3921
  @type instance: L{objects.Instance}
3922
  @param instance: the instance which owns the device
3923
  @type device: L{objects.Disk}
3924
  @param device: the device to create
3925
  @type force_create: boolean
3926
  @param force_create: whether to force creation of this device; this
3927
      will be change to True whenever we find a device which has
3928
      CreateOnSecondary() attribute
3929
  @param info: the extra 'metadata' we should attach to the device
3930
      (this will be represented as a LVM tag)
3931
  @type force_open: boolean
3932
  @param force_open: this parameter will be passes to the
3933
      L{backend.BlockdevCreate} function where it specifies
3934
      whether we run on primary or not, and it affects both
3935
      the child assembly and the device own Open() execution
3936

3937
  """
3938
  if device.CreateOnSecondary():
3939
    force_create = True
3940

    
3941
  if device.children:
3942
    for child in device.children:
3943
      _CreateBlockDev(lu, node, instance, child, force_create,
3944
                      info, force_open)
3945

    
3946
  if not force_create:
3947
    return
3948

    
3949
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3950

    
3951

    
3952
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3953
  """Create a single block device on a given node.
3954

3955
  This will not recurse over children of the device, so they must be
3956
  created in advance.
3957

3958
  @param lu: the lu on whose behalf we execute
3959
  @param node: the node on which to create the device
3960
  @type instance: L{objects.Instance}
3961
  @param instance: the instance which owns the device
3962
  @type device: L{objects.Disk}
3963
  @param device: the device to create
3964
  @param info: the extra 'metadata' we should attach to the device
3965
      (this will be represented as a LVM tag)
3966
  @type force_open: boolean
3967
  @param force_open: this parameter will be passes to the
3968
      L{backend.BlockdevCreate} function where it specifies
3969
      whether we run on primary or not, and it affects both
3970
      the child assembly and the device own Open() execution
3971

3972
  """
3973
  lu.cfg.SetDiskID(device, node)
3974
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3975
                                       instance.name, force_open, info)
3976
  msg = result.RemoteFailMsg()
3977
  if msg:
3978
    raise errors.OpExecError("Can't create block device %s on"
3979
                             " node %s for instance %s: %s" %
3980
                             (device, node, instance.name, msg))
3981
  if device.physical_id is None:
3982
    device.physical_id = result.payload
3983

    
3984

    
3985
def _GenerateUniqueNames(lu, exts):
3986
  """Generate a suitable LV name.
3987

3988
  This will generate a logical volume name for the given instance.
3989

3990
  """
3991
  results = []
3992
  for val in exts:
3993
    new_id = lu.cfg.GenerateUniqueID()
3994
    results.append("%s%s" % (new_id, val))
3995
  return results
3996

    
3997

    
3998
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3999
                         p_minor, s_minor):
4000
  """Generate a drbd8 device complete with its children.
4001

4002
  """
4003
  port = lu.cfg.AllocatePort()
4004
  vgname = lu.cfg.GetVGName()
4005
  shared_secret = lu.cfg.GenerateDRBDSecret()
4006
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4007
                          logical_id=(vgname, names[0]))
4008
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4009
                          logical_id=(vgname, names[1]))
4010
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4011
                          logical_id=(primary, secondary, port,
4012
                                      p_minor, s_minor,
4013
                                      shared_secret),
4014
                          children=[dev_data, dev_meta],
4015
                          iv_name=iv_name)
4016
  return drbd_dev
4017

    
4018

    
4019
def _GenerateDiskTemplate(lu, template_name,
4020
                          instance_name, primary_node,
4021
                          secondary_nodes, disk_info,
4022
                          file_storage_dir, file_driver,
4023
                          base_index):
4024
  """Generate the entire disk layout for a given template type.
4025

4026
  """
4027
  #TODO: compute space requirements
4028

    
4029
  vgname = lu.cfg.GetVGName()
4030
  disk_count = len(disk_info)
4031
  disks = []
4032
  if template_name == constants.DT_DISKLESS:
4033
    pass
4034
  elif template_name == constants.DT_PLAIN:
4035
    if len(secondary_nodes) != 0:
4036
      raise errors.ProgrammerError("Wrong template configuration")
4037

    
4038
    names = _GenerateUniqueNames(lu, [".disk%d" % i
4039
                                      for i in range(disk_count)])
4040
    for idx, disk in enumerate(disk_info):
4041
      disk_index = idx + base_index
4042
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4043
                              logical_id=(vgname, names[idx]),
4044
                              iv_name="disk/%d" % disk_index,
4045
                              mode=disk["mode"])
4046
      disks.append(disk_dev)
4047
  elif template_name == constants.DT_DRBD8:
4048
    if len(secondary_nodes) != 1:
4049
      raise errors.ProgrammerError("Wrong template configuration")
4050
    remote_node = secondary_nodes[0]
4051
    minors = lu.cfg.AllocateDRBDMinor(
4052
      [primary_node, remote_node] * len(disk_info), instance_name)
4053

    
4054
    names = []
4055
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4056
                                               for i in range(disk_count)]):
4057
      names.append(lv_prefix + "_data")
4058
      names.append(lv_prefix + "_meta")
4059
    for idx, disk in enumerate(disk_info):
4060
      disk_index = idx + base_index
4061
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4062
                                      disk["size"], names[idx*2:idx*2+2],
4063
                                      "disk/%d" % disk_index,
4064
                                      minors[idx*2], minors[idx*2+1])
4065
      disk_dev.mode = disk["mode"]
4066
      disks.append(disk_dev)
4067
  elif template_name == constants.DT_FILE:
4068
    if len(secondary_nodes) != 0:
4069
      raise errors.ProgrammerError("Wrong template configuration")
4070

    
4071
    for idx, disk in enumerate(disk_info):
4072
      disk_index = idx + base_index
4073
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4074
                              iv_name="disk/%d" % disk_index,
4075
                              logical_id=(file_driver,
4076
                                          "%s/disk%d" % (file_storage_dir,
4077
                                                         disk_index)),
4078
                              mode=disk["mode"])
4079
      disks.append(disk_dev)
4080
  else:
4081
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4082
  return disks
4083

    
4084

    
4085
def _GetInstanceInfoText(instance):
4086
  """Compute that text that should be added to the disk's metadata.
4087

4088
  """
4089
  return "originstname+%s" % instance.name
4090

    
4091

    
4092
def _CreateDisks(lu, instance):
4093
  """Create all disks for an instance.
4094

4095
  This abstracts away some work from AddInstance.
4096

4097
  @type lu: L{LogicalUnit}
4098
  @param lu: the logical unit on whose behalf we execute
4099
  @type instance: L{objects.Instance}
4100
  @param instance: the instance whose disks we should create
4101
  @rtype: boolean
4102
  @return: the success of the creation
4103

4104
  """
4105
  info = _GetInstanceInfoText(instance)
4106
  pnode = instance.primary_node
4107

    
4108
  if instance.disk_template == constants.DT_FILE:
4109
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4110
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4111

    
4112
    if result.failed or not result.data:
4113
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4114

    
4115
    if not result.data[0]:
4116
      raise errors.OpExecError("Failed to create directory '%s'" %
4117
                               file_storage_dir)
4118

    
4119
  # Note: this needs to be kept in sync with adding of disks in
4120
  # LUSetInstanceParams
4121
  for device in instance.disks:
4122
    logging.info("Creating volume %s for instance %s",
4123
                 device.iv_name, instance.name)
4124
    #HARDCODE
4125
    for node in instance.all_nodes:
4126
      f_create = node == pnode
4127
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4128

    
4129

    
4130
def _RemoveDisks(lu, instance):
4131
  """Remove all disks for an instance.
4132

4133
  This abstracts away some work from `AddInstance()` and
4134
  `RemoveInstance()`. Note that in case some of the devices couldn't
4135
  be removed, the removal will continue with the other ones (compare
4136
  with `_CreateDisks()`).
4137

4138
  @type lu: L{LogicalUnit}
4139
  @param lu: the logical unit on whose behalf we execute
4140
  @type instance: L{objects.Instance}
4141
  @param instance: the instance whose disks we should remove
4142
  @rtype: boolean
4143
  @return: the success of the removal
4144

4145
  """
4146
  logging.info("Removing block devices for instance %s", instance.name)
4147

    
4148
  all_result = True
4149
  for device in instance.disks:
4150
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4151
      lu.cfg.SetDiskID(disk, node)
4152
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4153
      if msg:
4154
        lu.LogWarning("Could not remove block device %s on node %s,"
4155
                      " continuing anyway: %s", device.iv_name, node, msg)
4156
        all_result = False
4157

    
4158
  if instance.disk_template == constants.DT_FILE:
4159
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4160
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4161
                                                 file_storage_dir)
4162
    if result.failed or not result.data:
4163
      logging.error("Could not remove directory '%s'", file_storage_dir)
4164
      all_result = False
4165

    
4166
  return all_result
4167

    
4168

    
4169
def _ComputeDiskSize(disk_template, disks):
4170
  """Compute disk size requirements in the volume group
4171

4172
  """
4173
  # Required free disk space as a function of disk and swap space
4174
  req_size_dict = {
4175
    constants.DT_DISKLESS: None,
4176
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4177
    # 128 MB are added for drbd metadata for each disk
4178
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4179
    constants.DT_FILE: None,
4180
  }
4181

    
4182
  if disk_template not in req_size_dict:
4183
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4184
                                 " is unknown" %  disk_template)
4185

    
4186
  return req_size_dict[disk_template]
4187

    
4188

    
4189
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4190
  """Hypervisor parameter validation.
4191

4192
  This function abstract the hypervisor parameter validation to be
4193
  used in both instance create and instance modify.
4194

4195
  @type lu: L{LogicalUnit}
4196
  @param lu: the logical unit for which we check
4197
  @type nodenames: list
4198
  @param nodenames: the list of nodes on which we should check
4199
  @type hvname: string
4200
  @param hvname: the name of the hypervisor we should use
4201
  @type hvparams: dict
4202
  @param hvparams: the parameters which we need to check
4203
  @raise errors.OpPrereqError: if the parameters are not valid
4204

4205
  """
4206
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4207
                                                  hvname,
4208
                                                  hvparams)
4209
  for node in nodenames:
4210
    info = hvinfo[node]
4211
    if info.offline:
4212
      continue
4213
    msg = info.RemoteFailMsg()
4214
    if msg:
4215
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4216
                                 " %s" % msg)
4217

    
4218

    
4219
class LUCreateInstance(LogicalUnit):
4220
  """Create an instance.
4221

4222
  """
4223
  HPATH = "instance-add"
4224
  HTYPE = constants.HTYPE_INSTANCE
4225
  _OP_REQP = ["instance_name", "disks", "disk_template",
4226
              "mode", "start",
4227
              "wait_for_sync", "ip_check", "nics",
4228
              "hvparams", "beparams"]
4229
  REQ_BGL = False
4230

    
4231
  def _ExpandNode(self, node):
4232
    """Expands and checks one node name.
4233

4234
    """
4235
    node_full = self.cfg.ExpandNodeName(node)
4236
    if node_full is None:
4237
      raise errors.OpPrereqError("Unknown node %s" % node)
4238
    return node_full
4239

    
4240
  def ExpandNames(self):
4241
    """ExpandNames for CreateInstance.
4242

4243
    Figure out the right locks for instance creation.
4244

4245
    """
4246
    self.needed_locks = {}
4247

    
4248
    # set optional parameters to none if they don't exist
4249
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4250
      if not hasattr(self.op, attr):
4251
        setattr(self.op, attr, None)
4252

    
4253
    # cheap checks, mostly valid constants given
4254

    
4255
    # verify creation mode
4256
    if self.op.mode not in (constants.INSTANCE_CREATE,
4257
                            constants.INSTANCE_IMPORT):
4258
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4259
                                 self.op.mode)
4260

    
4261
    # disk template and mirror node verification
4262
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4263
      raise errors.OpPrereqError("Invalid disk template name")
4264

    
4265
    if self.op.hypervisor is None:
4266
      self.op.hypervisor = self.cfg.GetHypervisorType()
4267

    
4268
    cluster = self.cfg.GetClusterInfo()
4269
    enabled_hvs = cluster.enabled_hypervisors
4270
    if self.op.hypervisor not in enabled_hvs:
4271
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4272
                                 " cluster (%s)" % (self.op.hypervisor,
4273
                                  ",".join(enabled_hvs)))
4274

    
4275
    # check hypervisor parameter syntax (locally)
4276
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4277
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4278
                                  self.op.hvparams)
4279
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4280
    hv_type.CheckParameterSyntax(filled_hvp)
4281

    
4282
    # fill and remember the beparams dict
4283
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4284
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4285
                                    self.op.beparams)
4286

    
4287
    #### instance parameters check
4288

    
4289
    # instance name verification
4290
    hostname1 = utils.HostInfo(self.op.instance_name)
4291
    self.op.instance_name = instance_name = hostname1.name
4292

    
4293
    # this is just a preventive check, but someone might still add this
4294
    # instance in the meantime, and creation will fail at lock-add time
4295
    if instance_name in self.cfg.GetInstanceList():
4296
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4297
                                 instance_name)
4298

    
4299
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4300

    
4301
    # NIC buildup
4302
    self.nics = []
4303
    for nic in self.op.nics:
4304
      # ip validity checks
4305
      ip = nic.get("ip", None)
4306
      if ip is None or ip.lower() == "none":
4307
        nic_ip = None
4308
      elif ip.lower() == constants.VALUE_AUTO:
4309
        nic_ip = hostname1.ip
4310
      else:
4311
        if not utils.IsValidIP(ip):
4312
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4313
                                     " like a valid IP" % ip)
4314
        nic_ip = ip
4315

    
4316
      # MAC address verification
4317
      mac = nic.get("mac", constants.VALUE_AUTO)
4318
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4319
        if not utils.IsValidMac(mac.lower()):
4320
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4321
                                     mac)
4322
      # bridge verification
4323
      bridge = nic.get("bridge", None)
4324
      if bridge is None:
4325
        bridge = self.cfg.GetDefBridge()
4326
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4327

    
4328
    # disk checks/pre-build
4329
    self.disks = []
4330
    for disk in self.op.disks:
4331
      mode = disk.get("mode", constants.DISK_RDWR)
4332
      if mode not in constants.DISK_ACCESS_SET:
4333
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4334
                                   mode)
4335
      size = disk.get("size", None)
4336
      if size is None:
4337
        raise errors.OpPrereqError("Missing disk size")
4338
      try:
4339
        size = int(size)
4340
      except ValueError:
4341
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4342
      self.disks.append({"size": size, "mode": mode})
4343

    
4344
    # used in CheckPrereq for ip ping check
4345
    self.check_ip = hostname1.ip
4346

    
4347
    # file storage checks
4348
    if (self.op.file_driver and
4349
        not self.op.file_driver in constants.FILE_DRIVER):
4350
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4351
                                 self.op.file_driver)
4352

    
4353
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4354
      raise errors.OpPrereqError("File storage directory path not absolute")
4355

    
4356
    ### Node/iallocator related checks
4357
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4358
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4359
                                 " node must be given")
4360

    
4361
    if self.op.iallocator:
4362
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4363
    else:
4364
      self.op.pnode = self._ExpandNode(self.op.pnode)
4365
      nodelist = [self.op.pnode]
4366
      if self.op.snode is not None:
4367
        self.op.snode = self._ExpandNode(self.op.snode)
4368
        nodelist.append(self.op.snode)
4369
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4370

    
4371
    # in case of import lock the source node too
4372
    if self.op.mode == constants.INSTANCE_IMPORT:
4373
      src_node = getattr(self.op, "src_node", None)
4374
      src_path = getattr(self.op, "src_path", None)
4375

    
4376
      if src_path is None:
4377
        self.op.src_path = src_path = self.op.instance_name
4378

    
4379
      if src_node is None:
4380
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4381
        self.op.src_node = None
4382
        if os.path.isabs(src_path):
4383
          raise errors.OpPrereqError("Importing an instance from an absolute"
4384
                                     " path requires a source node option.")
4385
      else:
4386
        self.op.src_node = src_node = self._ExpandNode(src_node)
4387
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4388
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4389
        if not os.path.isabs(src_path):
4390
          self.op.src_path = src_path = \
4391
            os.path.join(constants.EXPORT_DIR, src_path)
4392

    
4393
    else: # INSTANCE_CREATE
4394
      if getattr(self.op, "os_type", None) is None:
4395
        raise errors.OpPrereqError("No guest OS specified")
4396

    
4397
  def _RunAllocator(self):
4398
    """Run the allocator based on input opcode.
4399

4400
    """
4401
    nics = [n.ToDict() for n in self.nics]
4402
    ial = IAllocator(self,
4403
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4404
                     name=self.op.instance_name,
4405
                     disk_template=self.op.disk_template,
4406
                     tags=[],
4407
                     os=self.op.os_type,
4408
                     vcpus=self.be_full[constants.BE_VCPUS],
4409
                     mem_size=self.be_full[constants.BE_MEMORY],
4410
                     disks=self.disks,
4411
                     nics=nics,
4412
                     hypervisor=self.op.hypervisor,
4413
                     )
4414

    
4415
    ial.Run(self.op.iallocator)
4416

    
4417
    if not ial.success:
4418
      raise errors.OpPrereqError("Can't compute nodes using"
4419
                                 " iallocator '%s': %s" % (self.op.iallocator,
4420
                                                           ial.info))
4421
    if len(ial.nodes) != ial.required_nodes:
4422
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4423
                                 " of nodes (%s), required %s" %
4424
                                 (self.op.iallocator, len(ial.nodes),
4425
                                  ial.required_nodes))
4426
    self.op.pnode = ial.nodes[0]
4427
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4428
                 self.op.instance_name, self.op.iallocator,
4429
                 ", ".join(ial.nodes))
4430
    if ial.required_nodes == 2:
4431
      self.op.snode = ial.nodes[1]
4432

    
4433
  def BuildHooksEnv(self):
4434
    """Build hooks env.
4435

4436
    This runs on master, primary and secondary nodes of the instance.
4437

4438
    """
4439
    env = {
4440
      "ADD_MODE": self.op.mode,
4441
      }
4442
    if self.op.mode == constants.INSTANCE_IMPORT:
4443
      env["SRC_NODE"] = self.op.src_node
4444
      env["SRC_PATH"] = self.op.src_path
4445
      env["SRC_IMAGES"] = self.src_images
4446

    
4447
    env.update(_BuildInstanceHookEnv(
4448
      name=self.op.instance_name,
4449
      primary_node=self.op.pnode,
4450
      secondary_nodes=self.secondaries,
4451
      status=self.op.start,
4452
      os_type=self.op.os_type,
4453
      memory=self.be_full[constants.BE_MEMORY],
4454
      vcpus=self.be_full[constants.BE_VCPUS],
4455
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4456
      disk_template=self.op.disk_template,
4457
      disks=[(d["size"], d["mode"]) for d in self.disks],
4458
    ))
4459

    
4460
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4461
          self.secondaries)
4462
    return env, nl, nl
4463

    
4464

    
4465
  def CheckPrereq(self):
4466
    """Check prerequisites.
4467

4468
    """
4469
    if (not self.cfg.GetVGName() and
4470
        self.op.disk_template not in constants.DTS_NOT_LVM):
4471
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4472
                                 " instances")
4473

    
4474
    if self.op.mode == constants.INSTANCE_IMPORT:
4475
      src_node = self.op.src_node
4476
      src_path = self.op.src_path
4477

    
4478
      if src_node is None:
4479
        exp_list = self.rpc.call_export_list(
4480
          self.acquired_locks[locking.LEVEL_NODE])
4481
        found = False
4482
        for node in exp_list:
4483
          if not exp_list[node].failed and src_path in exp_list[node].data:
4484
            found = True
4485
            self.op.src_node = src_node = node
4486
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4487
                                                       src_path)
4488
            break
4489
        if not found:
4490
          raise errors.OpPrereqError("No export found for relative path %s" %
4491
                                      src_path)
4492

    
4493
      _CheckNodeOnline(self, src_node)
4494
      result = self.rpc.call_export_info(src_node, src_path)
4495
      result.Raise()
4496
      if not result.data:
4497
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4498

    
4499
      export_info = result.data
4500
      if not export_info.has_section(constants.INISECT_EXP):
4501
        raise errors.ProgrammerError("Corrupted export config")
4502

    
4503
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4504
      if (int(ei_version) != constants.EXPORT_VERSION):
4505
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4506
                                   (ei_version, constants.EXPORT_VERSION))
4507

    
4508
      # Check that the new instance doesn't have less disks than the export
4509
      instance_disks = len(self.disks)
4510
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4511
      if instance_disks < export_disks:
4512
        raise errors.OpPrereqError("Not enough disks to import."
4513
                                   " (instance: %d, export: %d)" %
4514
                                   (instance_disks, export_disks))
4515

    
4516
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4517
      disk_images = []
4518
      for idx in range(export_disks):
4519
        option = 'disk%d_dump' % idx
4520
        if export_info.has_option(constants.INISECT_INS, option):
4521
          # FIXME: are the old os-es, disk sizes, etc. useful?
4522
          export_name = export_info.get(constants.INISECT_INS, option)
4523
          image = os.path.join(src_path, export_name)
4524
          disk_images.append(image)
4525
        else:
4526
          disk_images.append(False)
4527

    
4528
      self.src_images = disk_images
4529

    
4530
      old_name = export_info.get(constants.INISECT_INS, 'name')
4531
      # FIXME: int() here could throw a ValueError on broken exports
4532
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4533
      if self.op.instance_name == old_name:
4534
        for idx, nic in enumerate(self.nics):
4535
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4536
            nic_mac_ini = 'nic%d_mac' % idx
4537
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4538

    
4539
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4540
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4541
    if self.op.start and not self.op.ip_check:
4542
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4543
                                 " adding an instance in start mode")
4544

    
4545
    if self.op.ip_check:
4546
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4547
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4548
                                   (self.check_ip, self.op.instance_name))
4549

    
4550
    #### mac address generation
4551
    # By generating here the mac address both the allocator and the hooks get
4552
    # the real final mac address rather than the 'auto' or 'generate' value.
4553
    # There is a race condition between the generation and the instance object
4554
    # creation, which means that we know the mac is valid now, but we're not
4555
    # sure it will be when we actually add the instance. If things go bad
4556
    # adding the instance will abort because of a duplicate mac, and the
4557
    # creation job will fail.
4558
    for nic in self.nics:
4559
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4560
        nic.mac = self.cfg.GenerateMAC()
4561

    
4562
    #### allocator run
4563

    
4564
    if self.op.iallocator is not None:
4565
      self._RunAllocator()
4566

    
4567
    #### node related checks
4568

    
4569
    # check primary node
4570
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4571
    assert self.pnode is not None, \
4572
      "Cannot retrieve locked node %s" % self.op.pnode
4573
    if pnode.offline:
4574
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4575
                                 pnode.name)
4576
    if pnode.drained:
4577
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4578
                                 pnode.name)
4579

    
4580
    self.secondaries = []
4581

    
4582
    # mirror node verification
4583
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4584
      if self.op.snode is None:
4585
        raise errors.OpPrereqError("The networked disk templates need"
4586
                                   " a mirror node")
4587
      if self.op.snode == pnode.name:
4588
        raise errors.OpPrereqError("The secondary node cannot be"
4589
                                   " the primary node.")
4590
      _CheckNodeOnline(self, self.op.snode)
4591
      _CheckNodeNotDrained(self, self.op.snode)
4592
      self.secondaries.append(self.op.snode)
4593

    
4594
    nodenames = [pnode.name] + self.secondaries
4595

    
4596
    req_size = _ComputeDiskSize(self.op.disk_template,
4597
                                self.disks)
4598

    
4599
    # Check lv size requirements
4600
    if req_size is not None:
4601
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4602
                                         self.op.hypervisor)
4603
      for node in nodenames:
4604
        info = nodeinfo[node]
4605
        info.Raise()
4606
        info = info.data
4607
        if not info:
4608
          raise errors.OpPrereqError("Cannot get current information"
4609
                                     " from node '%s'" % node)
4610
        vg_free = info.get('vg_free', None)
4611
        if not isinstance(vg_free, int):
4612
          raise errors.OpPrereqError("Can't compute free disk space on"
4613
                                     " node %s" % node)
4614
        if req_size > info['vg_free']:
4615
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4616
                                     " %d MB available, %d MB required" %
4617
                                     (node, info['vg_free'], req_size))
4618

    
4619
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4620

    
4621
    # os verification
4622
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4623
    result.Raise()
4624
    if not isinstance(result.data, objects.OS):
4625
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4626
                                 " primary node"  % self.op.os_type)
4627

    
4628
    # bridge check on primary node
4629
    bridges = [n.bridge for n in self.nics]
4630
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4631
    result.Raise()
4632
    if not result.data:
4633
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4634
                                 " exist on destination node '%s'" %
4635
                                 (",".join(bridges), pnode.name))
4636

    
4637
    # memory check on primary node
4638
    if self.op.start:
4639
      _CheckNodeFreeMemory(self, self.pnode.name,
4640
                           "creating instance %s" % self.op.instance_name,
4641
                           self.be_full[constants.BE_MEMORY],
4642
                           self.op.hypervisor)
4643

    
4644
  def Exec(self, feedback_fn):
4645
    """Create and add the instance to the cluster.
4646

4647
    """
4648
    instance = self.op.instance_name
4649
    pnode_name = self.pnode.name
4650

    
4651
    ht_kind = self.op.hypervisor
4652
    if ht_kind in constants.HTS_REQ_PORT:
4653
      network_port = self.cfg.AllocatePort()
4654
    else:
4655
      network_port = None
4656

    
4657
    ##if self.op.vnc_bind_address is None:
4658
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4659

    
4660
    # this is needed because os.path.join does not accept None arguments
4661
    if self.op.file_storage_dir is None:
4662
      string_file_storage_dir = ""
4663
    else:
4664
      string_file_storage_dir = self.op.file_storage_dir
4665

    
4666
    # build the full file storage dir path
4667
    file_storage_dir = os.path.normpath(os.path.join(
4668
                                        self.cfg.GetFileStorageDir(),
4669
                                        string_file_storage_dir, instance))
4670

    
4671

    
4672
    disks = _GenerateDiskTemplate(self,
4673
                                  self.op.disk_template,
4674
                                  instance, pnode_name,
4675
                                  self.secondaries,
4676
                                  self.disks,
4677
                                  file_storage_dir,
4678
                                  self.op.file_driver,
4679
                                  0)
4680

    
4681
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4682
                            primary_node=pnode_name,
4683
                            nics=self.nics, disks=disks,
4684
                            disk_template=self.op.disk_template,
4685
                            admin_up=False,
4686
                            network_port=network_port,
4687
                            beparams=self.op.beparams,
4688
                            hvparams=self.op.hvparams,
4689
                            hypervisor=self.op.hypervisor,
4690
                            )
4691

    
4692
    feedback_fn("* creating instance disks...")
4693
    try:
4694
      _CreateDisks(self, iobj)
4695
    except errors.OpExecError:
4696
      self.LogWarning("Device creation failed, reverting...")
4697
      try:
4698
        _RemoveDisks(self, iobj)
4699
      finally:
4700
        self.cfg.ReleaseDRBDMinors(instance)
4701
        raise
4702

    
4703
    feedback_fn("adding instance %s to cluster config" % instance)
4704

    
4705
    self.cfg.AddInstance(iobj)
4706
    # Declare that we don't want to remove the instance lock anymore, as we've
4707
    # added the instance to the config
4708
    del self.remove_locks[locking.LEVEL_INSTANCE]
4709
    # Unlock all the nodes
4710
    if self.op.mode == constants.INSTANCE_IMPORT:
4711
      nodes_keep = [self.op.src_node]
4712
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4713
                       if node != self.op.src_node]
4714
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4715
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4716
    else:
4717
      self.context.glm.release(locking.LEVEL_NODE)
4718
      del self.acquired_locks[locking.LEVEL_NODE]
4719

    
4720
    if self.op.wait_for_sync:
4721
      disk_abort = not _WaitForSync(self, iobj)
4722
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4723
      # make sure the disks are not degraded (still sync-ing is ok)
4724
      time.sleep(15)
4725
      feedback_fn("* checking mirrors status")
4726
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4727
    else:
4728
      disk_abort = False
4729

    
4730
    if disk_abort:
4731
      _RemoveDisks(self, iobj)
4732
      self.cfg.RemoveInstance(iobj.name)
4733
      # Make sure the instance lock gets removed
4734
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4735
      raise errors.OpExecError("There are some degraded disks for"
4736
                               " this instance")
4737

    
4738
    feedback_fn("creating os for instance %s on node %s" %
4739
                (instance, pnode_name))
4740

    
4741
    if iobj.disk_template != constants.DT_DISKLESS:
4742
      if self.op.mode == constants.INSTANCE_CREATE:
4743
        feedback_fn("* running the instance OS create scripts...")
4744
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4745
        msg = result.RemoteFailMsg()
4746
        if msg:
4747
          raise errors.OpExecError("Could not add os for instance %s"
4748
                                   " on node %s: %s" %
4749
                                   (instance, pnode_name, msg))
4750

    
4751
      elif self.op.mode == constants.INSTANCE_IMPORT:
4752
        feedback_fn("* running the instance OS import scripts...")
4753
        src_node = self.op.src_node
4754
        src_images = self.src_images
4755
        cluster_name = self.cfg.GetClusterName()
4756
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4757
                                                         src_node, src_images,
4758
                                                         cluster_name)
4759
        import_result.Raise()
4760
        for idx, result in enumerate(import_result.data):
4761
          if not result:
4762
            self.LogWarning("Could not import the image %s for instance"
4763
                            " %s, disk %d, on node %s" %
4764
                            (src_images[idx], instance, idx, pnode_name))
4765
      else:
4766
        # also checked in the prereq part
4767
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4768
                                     % self.op.mode)
4769

    
4770
    if self.op.start:
4771
      iobj.admin_up = True
4772
      self.cfg.Update(iobj)
4773
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4774
      feedback_fn("* starting instance...")
4775
      result = self.rpc.call_instance_start(pnode_name, iobj)
4776
      msg = result.RemoteFailMsg()
4777
      if msg:
4778
        raise errors.OpExecError("Could not start instance: %s" % msg)
4779

    
4780

    
4781
class LUConnectConsole(NoHooksLU):
4782
  """Connect to an instance's console.
4783

4784
  This is somewhat special in that it returns the command line that
4785
  you need to run on the master node in order to connect to the
4786
  console.
4787

4788
  """
4789
  _OP_REQP = ["instance_name"]
4790
  REQ_BGL = False
4791

    
4792
  def ExpandNames(self):
4793
    self._ExpandAndLockInstance()
4794

    
4795
  def CheckPrereq(self):
4796
    """Check prerequisites.
4797

4798
    This checks that the instance is in the cluster.
4799

4800
    """
4801
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4802
    assert self.instance is not None, \
4803
      "Cannot retrieve locked instance %s" % self.op.instance_name
4804
    _CheckNodeOnline(self, self.instance.primary_node)
4805

    
4806
  def Exec(self, feedback_fn):
4807
    """Connect to the console of an instance
4808

4809
    """
4810
    instance = self.instance
4811
    node = instance.primary_node
4812

    
4813
    node_insts = self.rpc.call_instance_list([node],
4814
                                             [instance.hypervisor])[node]
4815
    node_insts.Raise()
4816

    
4817
    if instance.name not in node_insts.data:
4818
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4819

    
4820
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4821

    
4822
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4823
    cluster = self.cfg.GetClusterInfo()
4824
    # beparams and hvparams are passed separately, to avoid editing the
4825
    # instance and then saving the defaults in the instance itself.
4826
    hvparams = cluster.FillHV(instance)
4827
    beparams = cluster.FillBE(instance)
4828
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4829

    
4830
    # build ssh cmdline
4831
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4832

    
4833

    
4834
class LUReplaceDisks(LogicalUnit):
4835
  """Replace the disks of an instance.
4836

4837
  """
4838
  HPATH = "mirrors-replace"
4839
  HTYPE = constants.HTYPE_INSTANCE
4840
  _OP_REQP = ["instance_name", "mode", "disks"]
4841
  REQ_BGL = False
4842

    
4843
  def CheckArguments(self):
4844
    if not hasattr(self.op, "remote_node"):
4845
      self.op.remote_node = None
4846
    if not hasattr(self.op, "iallocator"):
4847
      self.op.iallocator = None
4848

    
4849
    # check for valid parameter combination
4850
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4851
    if self.op.mode == constants.REPLACE_DISK_CHG:
4852
      if cnt == 2:
4853
        raise errors.OpPrereqError("When changing the secondary either an"
4854
                                   " iallocator script must be used or the"
4855
                                   " new node given")
4856
      elif cnt == 0:
4857
        raise errors.OpPrereqError("Give either the iallocator or the new"
4858
                                   " secondary, not both")
4859
    else: # not replacing the secondary
4860
      if cnt != 2:
4861
        raise errors.OpPrereqError("The iallocator and new node options can"
4862
                                   " be used only when changing the"
4863
                                   " secondary node")
4864

    
4865
  def ExpandNames(self):
4866
    self._ExpandAndLockInstance()
4867

    
4868
    if self.op.iallocator is not None:
4869
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4870
    elif self.op.remote_node is not None:
4871
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4872
      if remote_node is None:
4873
        raise errors.OpPrereqError("Node '%s' not known" %
4874
                                   self.op.remote_node)
4875
      self.op.remote_node = remote_node
4876
      # Warning: do not remove the locking of the new secondary here
4877
      # unless DRBD8.AddChildren is changed to work in parallel;
4878
      # currently it doesn't since parallel invocations of
4879
      # FindUnusedMinor will conflict
4880
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4881
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4882
    else:
4883
      self.needed_locks[locking.LEVEL_NODE] = []
4884
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4885

    
4886
  def DeclareLocks(self, level):
4887
    # If we're not already locking all nodes in the set we have to declare the
4888
    # instance's primary/secondary nodes.
4889
    if (level == locking.LEVEL_NODE and
4890
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4891
      self._LockInstancesNodes()
4892

    
4893
  def _RunAllocator(self):
4894
    """Compute a new secondary node using an IAllocator.
4895

4896
    """
4897
    ial = IAllocator(self,
4898
                     mode=constants.IALLOCATOR_MODE_RELOC,
4899
                     name=self.op.instance_name,
4900
                     relocate_from=[self.sec_node])
4901

    
4902
    ial.Run(self.op.iallocator)
4903

    
4904
    if not ial.success:
4905
      raise errors.OpPrereqError("Can't compute nodes using"
4906
                                 " iallocator '%s': %s" % (self.op.iallocator,
4907
                                                           ial.info))
4908
    if len(ial.nodes) != ial.required_nodes:
4909
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4910
                                 " of nodes (%s), required %s" %
4911
                                 (len(ial.nodes), ial.required_nodes))
4912
    self.op.remote_node = ial.nodes[0]
4913
    self.LogInfo("Selected new secondary for the instance: %s",
4914
                 self.op.remote_node)
4915

    
4916
  def BuildHooksEnv(self):
4917
    """Build hooks env.
4918

4919
    This runs on the master, the primary and all the secondaries.
4920

4921
    """
4922
    env = {
4923
      "MODE": self.op.mode,
4924
      "NEW_SECONDARY": self.op.remote_node,
4925
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4926
      }
4927
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4928
    nl = [
4929
      self.cfg.GetMasterNode(),
4930
      self.instance.primary_node,
4931
      ]
4932
    if self.op.remote_node is not None:
4933
      nl.append(self.op.remote_node)
4934
    return env, nl, nl
4935

    
4936
  def CheckPrereq(self):
4937
    """Check prerequisites.
4938

4939
    This checks that the instance is in the cluster.
4940

4941
    """
4942
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4943
    assert instance is not None, \
4944
      "Cannot retrieve locked instance %s" % self.op.instance_name
4945
    self.instance = instance
4946

    
4947
    if instance.disk_template != constants.DT_DRBD8:
4948
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4949
                                 " instances")
4950

    
4951
    if len(instance.secondary_nodes) != 1:
4952
      raise errors.OpPrereqError("The instance has a strange layout,"
4953
                                 " expected one secondary but found %d" %
4954
                                 len(instance.secondary_nodes))
4955

    
4956
    self.sec_node = instance.secondary_nodes[0]
4957

    
4958
    if self.op.iallocator is not None:
4959
      self._RunAllocator()
4960

    
4961
    remote_node = self.op.remote_node
4962
    if remote_node is not None:
4963
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4964
      assert self.remote_node_info is not None, \
4965
        "Cannot retrieve locked node %s" % remote_node
4966
    else:
4967
      self.remote_node_info = None
4968
    if remote_node == instance.primary_node:
4969
      raise errors.OpPrereqError("The specified node is the primary node of"
4970
                                 " the instance.")
4971
    elif remote_node == self.sec_node:
4972
      raise errors.OpPrereqError("The specified node is already the"
4973
                                 " secondary node of the instance.")
4974

    
4975
    if self.op.mode == constants.REPLACE_DISK_PRI:
4976
      n1 = self.tgt_node = instance.primary_node
4977
      n2 = self.oth_node = self.sec_node
4978
    elif self.op.mode == constants.REPLACE_DISK_SEC:
4979
      n1 = self.tgt_node = self.sec_node
4980
      n2 = self.oth_node = instance.primary_node
4981
    elif self.op.mode == constants.REPLACE_DISK_CHG:
4982
      n1 = self.new_node = remote_node
4983
      n2 = self.oth_node = instance.primary_node
4984
      self.tgt_node = self.sec_node
4985
      _CheckNodeNotDrained(self, remote_node)
4986
    else:
4987
      raise errors.ProgrammerError("Unhandled disk replace mode")
4988

    
4989
    _CheckNodeOnline(self, n1)
4990
    _CheckNodeOnline(self, n2)
4991

    
4992
    if not self.op.disks:
4993
      self.op.disks = range(len(instance.disks))
4994

    
4995
    for disk_idx in self.op.disks:
4996
      instance.FindDisk(disk_idx)
4997

    
4998
  def _ExecD8DiskOnly(self, feedback_fn):
4999
    """Replace a disk on the primary or secondary for dbrd8.
5000

5001
    The algorithm for replace is quite complicated:
5002

5003
      1. for each disk to be replaced:
5004

5005
        1. create new LVs on the target node with unique names
5006
        1. detach old LVs from the drbd device
5007
        1. rename old LVs to name_replaced.<time_t>
5008
        1. rename new LVs to old LVs
5009
        1. attach the new LVs (with the old names now) to the drbd device
5010

5011
      1. wait for sync across all devices
5012

5013
      1. for each modified disk:
5014

5015
        1. remove old LVs (which have the name name_replaces.<time_t>)
5016

5017
    Failures are not very well handled.
5018

5019
    """
5020
    steps_total = 6
5021
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5022
    instance = self.instance
5023
    iv_names = {}
5024
    vgname = self.cfg.GetVGName()
5025
    # start of work
5026
    cfg = self.cfg
5027
    tgt_node = self.tgt_node
5028
    oth_node = self.oth_node
5029

    
5030
    # Step: check device activation
5031
    self.proc.LogStep(1, steps_total, "check device existence")
5032
    info("checking volume groups")
5033
    my_vg = cfg.GetVGName()
5034
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5035
    if not results:
5036
      raise errors.OpExecError("Can't list volume groups on the nodes")
5037
    for node in oth_node, tgt_node:
5038
      res = results[node]
5039
      if res.failed or not res.data or my_vg not in res.data:
5040
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5041
                                 (my_vg, node))
5042
    for idx, dev in enumerate(instance.disks):
5043
      if idx not in self.op.disks:
5044
        continue
5045
      for node in tgt_node, oth_node:
5046
        info("checking disk/%d on %s" % (idx, node))
5047
        cfg.SetDiskID(dev, node)
5048
        result = self.rpc.call_blockdev_find(node, dev)
5049
        msg = result.RemoteFailMsg()
5050
        if not msg and not result.payload:
5051
          msg = "disk not found"
5052
        if msg:
5053
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5054
                                   (idx, node, msg))
5055

    
5056
    # Step: check other node consistency
5057
    self.proc.LogStep(2, steps_total, "check peer consistency")
5058
    for idx, dev in enumerate(instance.disks):
5059
      if idx not in self.op.disks:
5060
        continue
5061
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5062
      if not _CheckDiskConsistency(self, dev, oth_node,
5063
                                   oth_node==instance.primary_node):
5064
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5065
                                 " to replace disks on this node (%s)" %
5066
                                 (oth_node, tgt_node))
5067

    
5068
    # Step: create new storage
5069
    self.proc.LogStep(3, steps_total, "allocate new storage")
5070
    for idx, dev in enumerate(instance.disks):
5071
      if idx not in self.op.disks:
5072
        continue
5073
      size = dev.size
5074
      cfg.SetDiskID(dev, tgt_node)
5075
      lv_names = [".disk%d_%s" % (idx, suf)
5076
                  for suf in ["data", "meta"]]
5077
      names = _GenerateUniqueNames(self, lv_names)
5078
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5079
                             logical_id=(vgname, names[0]))
5080
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5081
                             logical_id=(vgname, names[1]))
5082
      new_lvs = [lv_data, lv_meta]
5083
      old_lvs = dev.children
5084
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5085
      info("creating new local storage on %s for %s" %
5086
           (tgt_node, dev.iv_name))
5087
      # we pass force_create=True to force the LVM creation
5088
      for new_lv in new_lvs:
5089
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5090
                        _GetInstanceInfoText(instance), False)
5091

    
5092
    # Step: for each lv, detach+rename*2+attach
5093
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5094
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5095
      info("detaching %s drbd from local storage" % dev.iv_name)
5096
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5097
      result.Raise()
5098
      if not result.data:
5099
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5100
                                 " %s for device %s" % (tgt_node, dev.iv_name))
5101
      #dev.children = []
5102
      #cfg.Update(instance)
5103

    
5104
      # ok, we created the new LVs, so now we know we have the needed
5105
      # storage; as such, we proceed on the target node to rename
5106
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5107
      # using the assumption that logical_id == physical_id (which in
5108
      # turn is the unique_id on that node)
5109

    
5110
      # FIXME(iustin): use a better name for the replaced LVs
5111
      temp_suffix = int(time.time())
5112
      ren_fn = lambda d, suff: (d.physical_id[0],
5113
                                d.physical_id[1] + "_replaced-%s" % suff)
5114
      # build the rename list based on what LVs exist on the node
5115
      rlist = []
5116
      for to_ren in old_lvs:
5117
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5118
        if not result.RemoteFailMsg() and result.payload:
5119
          # device exists
5120
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5121

    
5122
      info("renaming the old LVs on the target node")
5123
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5124
      result.Raise()
5125
      if not result.data:
5126
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5127
      # now we rename the new LVs to the old LVs
5128
      info("renaming the new LVs on the target node")
5129
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5130
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5131
      result.Raise()
5132
      if not result.data:
5133
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5134

    
5135
      for old, new in zip(old_lvs, new_lvs):
5136
        new.logical_id = old.logical_id
5137
        cfg.SetDiskID(new, tgt_node)
5138

    
5139
      for disk in old_lvs:
5140
        disk.logical_id = ren_fn(disk, temp_suffix)
5141
        cfg.SetDiskID(disk, tgt_node)
5142

    
5143
      # now that the new lvs have the old name, we can add them to the device
5144
      info("adding new mirror component on %s" % tgt_node)
5145
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5146
      if result.failed or not result.data:
5147
        for new_lv in new_lvs:
5148
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5149
          if msg:
5150
            warning("Can't rollback device %s: %s", dev, msg,
5151
                    hint="cleanup manually the unused logical volumes")
5152
        raise errors.OpExecError("Can't add local storage to drbd")
5153

    
5154
      dev.children = new_lvs
5155
      cfg.Update(instance)
5156

    
5157
    # Step: wait for sync
5158

    
5159
    # this can fail as the old devices are degraded and _WaitForSync
5160
    # does a combined result over all disks, so we don't check its
5161
    # return value
5162
    self.proc.LogStep(5, steps_total, "sync devices")
5163
    _WaitForSync(self, instance, unlock=True)
5164

    
5165
    # so check manually all the devices
5166
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5167
      cfg.SetDiskID(dev, instance.primary_node)
5168
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5169
      msg = result.RemoteFailMsg()
5170
      if not msg and not result.payload:
5171
        msg = "disk not found"
5172
      if msg:
5173
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5174
                                 (name, msg))
5175
      if result.payload[5]:
5176
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5177

    
5178
    # Step: remove old storage
5179
    self.proc.LogStep(6, steps_total, "removing old storage")
5180
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5181
      info("remove logical volumes for %s" % name)
5182
      for lv in old_lvs:
5183
        cfg.SetDiskID(lv, tgt_node)
5184
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5185
        if msg:
5186
          warning("Can't remove old LV: %s" % msg,
5187
                  hint="manually remove unused LVs")
5188
          continue
5189

    
5190
  def _ExecD8Secondary(self, feedback_fn):
5191
    """Replace the secondary node for drbd8.
5192

5193
    The algorithm for replace is quite complicated:
5194
      - for all disks of the instance:
5195
        - create new LVs on the new node with same names
5196
        - shutdown the drbd device on the old secondary
5197
        - disconnect the drbd network on the primary
5198
        - create the drbd device on the new secondary
5199
        - network attach the drbd on the primary, using an artifice:
5200
          the drbd code for Attach() will connect to the network if it
5201
          finds a device which is connected to the good local disks but
5202
          not network enabled
5203
      - wait for sync across all devices
5204
      - remove all disks from the old secondary
5205

5206
    Failures are not very well handled.
5207

5208
    """
5209
    steps_total = 6
5210
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5211
    instance = self.instance
5212
    iv_names = {}
5213
    # start of work
5214
    cfg = self.cfg
5215
    old_node = self.tgt_node
5216
    new_node = self.new_node
5217
    pri_node = instance.primary_node
5218
    nodes_ip = {
5219
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5220
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5221
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5222
      }
5223

    
5224
    # Step: check device activation
5225
    self.proc.LogStep(1, steps_total, "check device existence")
5226
    info("checking volume groups")
5227
    my_vg = cfg.GetVGName()
5228
    results = self.rpc.call_vg_list([pri_node, new_node])
5229
    for node in pri_node, new_node:
5230
      res = results[node]
5231
      if res.failed or not res.data or my_vg not in res.data:
5232
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5233
                                 (my_vg, node))
5234
    for idx, dev in enumerate(instance.disks):
5235
      if idx not in self.op.disks:
5236
        continue
5237
      info("checking disk/%d on %s" % (idx, pri_node))
5238
      cfg.SetDiskID(dev, pri_node)
5239
      result = self.rpc.call_blockdev_find(pri_node, dev)
5240
      msg = result.RemoteFailMsg()
5241
      if not msg and not result.payload:
5242
        msg = "disk not found"
5243
      if msg:
5244
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5245
                                 (idx, pri_node, msg))
5246

    
5247
    # Step: check other node consistency
5248
    self.proc.LogStep(2, steps_total, "check peer consistency")
5249
    for idx, dev in enumerate(instance.disks):
5250
      if idx not in self.op.disks:
5251
        continue
5252
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5253
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5254
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5255
                                 " unsafe to replace the secondary" %
5256
                                 pri_node)
5257

    
5258
    # Step: create new storage
5259
    self.proc.LogStep(3, steps_total, "allocate new storage")
5260
    for idx, dev in enumerate(instance.disks):
5261
      info("adding new local storage on %s for disk/%d" %
5262
           (new_node, idx))
5263
      # we pass force_create=True to force LVM creation
5264
      for new_lv in dev.children:
5265
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5266
                        _GetInstanceInfoText(instance), False)
5267

    
5268
    # Step 4: dbrd minors and drbd setups changes
5269
    # after this, we must manually remove the drbd minors on both the
5270
    # error and the success paths
5271
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5272
                                   instance.name)
5273
    logging.debug("Allocated minors %s" % (minors,))
5274
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5275
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5276
      size = dev.size
5277
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5278
      # create new devices on new_node; note that we create two IDs:
5279
      # one without port, so the drbd will be activated without
5280
      # networking information on the new node at this stage, and one
5281
      # with network, for the latter activation in step 4
5282
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5283
      if pri_node == o_node1:
5284
        p_minor = o_minor1
5285
      else:
5286
        p_minor = o_minor2
5287

    
5288
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5289
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5290

    
5291
      iv_names[idx] = (dev, dev.children, new_net_id)
5292
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5293
                    new_net_id)
5294
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5295
                              logical_id=new_alone_id,
5296
                              children=dev.children)
5297
      try:
5298
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5299
                              _GetInstanceInfoText(instance), False)
5300
      except errors.BlockDeviceError:
5301
        self.cfg.ReleaseDRBDMinors(instance.name)
5302
        raise
5303

    
5304
    for idx, dev in enumerate(instance.disks):
5305
      # we have new devices, shutdown the drbd on the old secondary
5306
      info("shutting down drbd for disk/%d on old node" % idx)
5307
      cfg.SetDiskID(dev, old_node)
5308
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5309
      if msg:
5310
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5311
                (idx, msg),
5312
                hint="Please cleanup this device manually as soon as possible")
5313

    
5314
    info("detaching primary drbds from the network (=> standalone)")
5315
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5316
                                               instance.disks)[pri_node]
5317

    
5318
    msg = result.RemoteFailMsg()
5319
    if msg:
5320
      # detaches didn't succeed (unlikely)
5321
      self.cfg.ReleaseDRBDMinors(instance.name)
5322
      raise errors.OpExecError("Can't detach the disks from the network on"
5323
                               " old node: %s" % (msg,))
5324

    
5325
    # if we managed to detach at least one, we update all the disks of
5326
    # the instance to point to the new secondary
5327
    info("updating instance configuration")
5328
    for dev, _, new_logical_id in iv_names.itervalues():
5329
      dev.logical_id = new_logical_id
5330
      cfg.SetDiskID(dev, pri_node)
5331
    cfg.Update(instance)
5332

    
5333
    # and now perform the drbd attach
5334
    info("attaching primary drbds to new secondary (standalone => connected)")
5335
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5336
                                           instance.disks, instance.name,
5337
                                           False)
5338
    for to_node, to_result in result.items():
5339
      msg = to_result.RemoteFailMsg()
5340
      if msg:
5341
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5342
                hint="please do a gnt-instance info to see the"
5343
                " status of disks")
5344

    
5345
    # this can fail as the old devices are degraded and _WaitForSync
5346
    # does a combined result over all disks, so we don't check its
5347
    # return value
5348
    self.proc.LogStep(5, steps_total, "sync devices")
5349
    _WaitForSync(self, instance, unlock=True)
5350

    
5351
    # so check manually all the devices
5352
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5353
      cfg.SetDiskID(dev, pri_node)
5354
      result = self.rpc.call_blockdev_find(pri_node, dev)
5355
      msg = result.RemoteFailMsg()
5356
      if not msg and not result.payload:
5357
        msg = "disk not found"
5358
      if msg:
5359
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5360
                                 (idx, msg))
5361
      if result.payload[5]:
5362
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5363

    
5364
    self.proc.LogStep(6, steps_total, "removing old storage")
5365
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5366
      info("remove logical volumes for disk/%d" % idx)
5367
      for lv in old_lvs:
5368
        cfg.SetDiskID(lv, old_node)
5369
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5370
        if msg:
5371
          warning("Can't remove LV on old secondary: %s", msg,
5372
                  hint="Cleanup stale volumes by hand")
5373

    
5374
  def Exec(self, feedback_fn):
5375
    """Execute disk replacement.
5376

5377
    This dispatches the disk replacement to the appropriate handler.
5378

5379
    """
5380
    instance = self.instance
5381

    
5382
    # Activate the instance disks if we're replacing them on a down instance
5383
    if not instance.admin_up:
5384
      _StartInstanceDisks(self, instance, True)
5385

    
5386
    if self.op.mode == constants.REPLACE_DISK_CHG:
5387
      fn = self._ExecD8Secondary
5388
    else:
5389
      fn = self._ExecD8DiskOnly
5390

    
5391
    ret = fn(feedback_fn)
5392

    
5393
    # Deactivate the instance disks if we're replacing them on a down instance
5394
    if not instance.admin_up:
5395
      _SafeShutdownInstanceDisks(self, instance)
5396

    
5397
    return ret
5398

    
5399

    
5400
class LUGrowDisk(LogicalUnit):
5401
  """Grow a disk of an instance.
5402

5403
  """
5404
  HPATH = "disk-grow"
5405
  HTYPE = constants.HTYPE_INSTANCE
5406
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5407
  REQ_BGL = False
5408

    
5409
  def ExpandNames(self):
5410
    self._ExpandAndLockInstance()
5411
    self.needed_locks[locking.LEVEL_NODE] = []
5412
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5413

    
5414
  def DeclareLocks(self, level):
5415
    if level == locking.LEVEL_NODE:
5416
      self._LockInstancesNodes()
5417

    
5418
  def BuildHooksEnv(self):
5419
    """Build hooks env.
5420

5421
    This runs on the master, the primary and all the secondaries.
5422

5423
    """
5424
    env = {
5425
      "DISK": self.op.disk,
5426
      "AMOUNT": self.op.amount,
5427
      }
5428
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5429
    nl = [
5430
      self.cfg.GetMasterNode(),
5431
      self.instance.primary_node,
5432
      ]
5433
    return env, nl, nl
5434

    
5435
  def CheckPrereq(self):
5436
    """Check prerequisites.
5437

5438
    This checks that the instance is in the cluster.
5439

5440
    """
5441
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5442
    assert instance is not None, \
5443
      "Cannot retrieve locked instance %s" % self.op.instance_name
5444
    nodenames = list(instance.all_nodes)
5445
    for node in nodenames:
5446
      _CheckNodeOnline(self, node)
5447

    
5448

    
5449
    self.instance = instance
5450

    
5451
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5452
      raise errors.OpPrereqError("Instance's disk layout does not support"
5453
                                 " growing.")
5454

    
5455
    self.disk = instance.FindDisk(self.op.disk)
5456

    
5457
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5458
                                       instance.hypervisor)
5459
    for node in nodenames:
5460
      info = nodeinfo[node]
5461
      if info.failed or not info.data:
5462
        raise errors.OpPrereqError("Cannot get current information"
5463
                                   " from node '%s'" % node)
5464
      vg_free = info.data.get('vg_free', None)
5465
      if not isinstance(vg_free, int):
5466
        raise errors.OpPrereqError("Can't compute free disk space on"
5467
                                   " node %s" % node)
5468
      if self.op.amount > vg_free:
5469
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5470
                                   " %d MiB available, %d MiB required" %
5471
                                   (node, vg_free, self.op.amount))
5472

    
5473
  def Exec(self, feedback_fn):
5474
    """Execute disk grow.
5475

5476
    """
5477
    instance = self.instance
5478
    disk = self.disk
5479
    for node in instance.all_nodes:
5480
      self.cfg.SetDiskID(disk, node)
5481
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5482
      msg = result.RemoteFailMsg()
5483
      if msg:
5484
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5485
                                 (node, msg))
5486
    disk.RecordGrow(self.op.amount)
5487
    self.cfg.Update(instance)
5488
    if self.op.wait_for_sync:
5489
      disk_abort = not _WaitForSync(self, instance)
5490
      if disk_abort:
5491
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5492
                             " status.\nPlease check the instance.")
5493

    
5494

    
5495
class LUQueryInstanceData(NoHooksLU):
5496
  """Query runtime instance data.
5497

5498
  """
5499
  _OP_REQP = ["instances", "static"]
5500
  REQ_BGL = False
5501

    
5502
  def ExpandNames(self):
5503
    self.needed_locks = {}
5504
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5505

    
5506
    if not isinstance(self.op.instances, list):
5507
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5508

    
5509
    if self.op.instances:
5510
      self.wanted_names = []
5511
      for name in self.op.instances:
5512
        full_name = self.cfg.ExpandInstanceName(name)
5513
        if full_name is None:
5514
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5515
        self.wanted_names.append(full_name)
5516
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5517
    else:
5518
      self.wanted_names = None
5519
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5520

    
5521
    self.needed_locks[locking.LEVEL_NODE] = []
5522
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5523

    
5524
  def DeclareLocks(self, level):
5525
    if level == locking.LEVEL_NODE:
5526
      self._LockInstancesNodes()
5527

    
5528
  def CheckPrereq(self):
5529
    """Check prerequisites.
5530

5531
    This only checks the optional instance list against the existing names.
5532

5533
    """
5534
    if self.wanted_names is None:
5535
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5536

    
5537
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5538
                             in self.wanted_names]
5539
    return
5540

    
5541
  def _ComputeDiskStatus(self, instance, snode, dev):
5542
    """Compute block device status.
5543

5544
    """
5545
    static = self.op.static
5546
    if not static:
5547
      self.cfg.SetDiskID(dev, instance.primary_node)
5548
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5549
      if dev_pstatus.offline:
5550
        dev_pstatus = None
5551
      else:
5552
        msg = dev_pstatus.RemoteFailMsg()
5553
        if msg:
5554
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5555
                                   (instance.name, msg))
5556
        dev_pstatus = dev_pstatus.payload
5557
    else:
5558
      dev_pstatus = None
5559

    
5560
    if dev.dev_type in constants.LDS_DRBD:
5561
      # we change the snode then (otherwise we use the one passed in)
5562
      if dev.logical_id[0] == instance.primary_node:
5563
        snode = dev.logical_id[1]
5564
      else:
5565
        snode = dev.logical_id[0]
5566

    
5567
    if snode and not static:
5568
      self.cfg.SetDiskID(dev, snode)
5569
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5570
      if dev_sstatus.offline:
5571
        dev_sstatus = None
5572
      else:
5573
        msg = dev_sstatus.RemoteFailMsg()
5574
        if msg:
5575
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5576
                                   (instance.name, msg))
5577
        dev_sstatus = dev_sstatus.payload
5578
    else:
5579
      dev_sstatus = None
5580

    
5581
    if dev.children:
5582
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5583
                      for child in dev.children]
5584
    else:
5585
      dev_children = []
5586

    
5587
    data = {
5588
      "iv_name": dev.iv_name,
5589
      "dev_type": dev.dev_type,
5590
      "logical_id": dev.logical_id,
5591
      "physical_id": dev.physical_id,
5592
      "pstatus": dev_pstatus,
5593
      "sstatus": dev_sstatus,
5594
      "children": dev_children,
5595
      "mode": dev.mode,
5596
      }
5597

    
5598
    return data
5599

    
5600
  def Exec(self, feedback_fn):
5601
    """Gather and return data"""
5602
    result = {}
5603

    
5604
    cluster = self.cfg.GetClusterInfo()
5605

    
5606
    for instance in self.wanted_instances:
5607
      if not self.op.static:
5608
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5609
                                                  instance.name,
5610
                                                  instance.hypervisor)
5611
        remote_info.Raise()
5612
        remote_info = remote_info.data
5613
        if remote_info and "state" in remote_info:
5614
          remote_state = "up"
5615
        else:
5616
          remote_state = "down"
5617
      else:
5618
        remote_state = None
5619
      if instance.admin_up:
5620
        config_state = "up"
5621
      else:
5622
        config_state = "down"
5623

    
5624
      disks = [self._ComputeDiskStatus(instance, None, device)
5625
               for device in instance.disks]
5626

    
5627
      idict = {
5628
        "name": instance.name,
5629
        "config_state": config_state,
5630
        "run_state": remote_state,
5631
        "pnode": instance.primary_node,
5632
        "snodes": instance.secondary_nodes,
5633
        "os": instance.os,
5634
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5635
        "disks": disks,
5636
        "hypervisor": instance.hypervisor,
5637
        "network_port": instance.network_port,
5638
        "hv_instance": instance.hvparams,
5639
        "hv_actual": cluster.FillHV(instance),
5640
        "be_instance": instance.beparams,
5641
        "be_actual": cluster.FillBE(instance),
5642
        }
5643

    
5644
      result[instance.name] = idict
5645

    
5646
    return result
5647

    
5648

    
5649
class LUSetInstanceParams(LogicalUnit):
5650
  """Modifies an instances's parameters.
5651

5652
  """
5653
  HPATH = "instance-modify"
5654
  HTYPE = constants.HTYPE_INSTANCE
5655
  _OP_REQP = ["instance_name"]
5656
  REQ_BGL = False
5657

    
5658
  def CheckArguments(self):
5659
    if not hasattr(self.op, 'nics'):
5660
      self.op.nics = []
5661
    if not hasattr(self.op, 'disks'):
5662
      self.op.disks = []
5663
    if not hasattr(self.op, 'beparams'):
5664
      self.op.beparams = {}
5665
    if not hasattr(self.op, 'hvparams'):
5666
      self.op.hvparams = {}
5667
    self.op.force = getattr(self.op, "force", False)
5668
    if not (self.op.nics or self.op.disks or
5669
            self.op.hvparams or self.op.beparams):
5670
      raise errors.OpPrereqError("No changes submitted")
5671

    
5672
    # Disk validation
5673
    disk_addremove = 0
5674
    for disk_op, disk_dict in self.op.disks:
5675
      if disk_op == constants.DDM_REMOVE:
5676
        disk_addremove += 1
5677
        continue
5678
      elif disk_op == constants.DDM_ADD:
5679
        disk_addremove += 1
5680
      else:
5681
        if not isinstance(disk_op, int):
5682
          raise errors.OpPrereqError("Invalid disk index")
5683
      if disk_op == constants.DDM_ADD:
5684
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5685
        if mode not in constants.DISK_ACCESS_SET:
5686
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5687
        size = disk_dict.get('size', None)
5688
        if size is None:
5689
          raise errors.OpPrereqError("Required disk parameter size missing")
5690
        try:
5691
          size = int(size)
5692
        except ValueError, err:
5693
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5694
                                     str(err))
5695
        disk_dict['size'] = size
5696
      else:
5697
        # modification of disk
5698
        if 'size' in disk_dict:
5699
          raise errors.OpPrereqError("Disk size change not possible, use"
5700
                                     " grow-disk")
5701

    
5702
    if disk_addremove > 1:
5703
      raise errors.OpPrereqError("Only one disk add or remove operation"
5704
                                 " supported at a time")
5705

    
5706
    # NIC validation
5707
    nic_addremove = 0
5708
    for nic_op, nic_dict in self.op.nics:
5709
      if nic_op == constants.DDM_REMOVE:
5710
        nic_addremove += 1
5711
        continue
5712
      elif nic_op == constants.DDM_ADD:
5713
        nic_addremove += 1
5714
      else:
5715
        if not isinstance(nic_op, int):
5716
          raise errors.OpPrereqError("Invalid nic index")
5717

    
5718
      # nic_dict should be a dict
5719
      nic_ip = nic_dict.get('ip', None)
5720
      if nic_ip is not None:
5721
        if nic_ip.lower() == constants.VALUE_NONE:
5722
          nic_dict['ip'] = None
5723
        else:
5724
          if not utils.IsValidIP(nic_ip):
5725
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5726

    
5727
      if nic_op == constants.DDM_ADD:
5728
        nic_bridge = nic_dict.get('bridge', None)
5729
        if nic_bridge is None:
5730
          nic_dict['bridge'] = self.cfg.GetDefBridge()
5731
        nic_mac = nic_dict.get('mac', None)
5732
        if nic_mac is None:
5733
          nic_dict['mac'] = constants.VALUE_AUTO
5734

    
5735
      if 'mac' in nic_dict:
5736
        nic_mac = nic_dict['mac']
5737
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5738
          if not utils.IsValidMac(nic_mac):
5739
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5740
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5741
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5742
                                     " modifying an existing nic")
5743

    
5744
    if nic_addremove > 1:
5745
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5746
                                 " supported at a time")
5747

    
5748
  def ExpandNames(self):
5749
    self._ExpandAndLockInstance()
5750
    self.needed_locks[locking.LEVEL_NODE] = []
5751
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5752

    
5753
  def DeclareLocks(self, level):
5754
    if level == locking.LEVEL_NODE:
5755
      self._LockInstancesNodes()
5756

    
5757
  def BuildHooksEnv(self):
5758
    """Build hooks env.
5759

5760
    This runs on the master, primary and secondaries.
5761

5762
    """
5763
    args = dict()
5764
    if constants.BE_MEMORY in self.be_new:
5765
      args['memory'] = self.be_new[constants.BE_MEMORY]
5766
    if constants.BE_VCPUS in self.be_new:
5767
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5768
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5769
    # information at all.
5770
    if self.op.nics:
5771
      args['nics'] = []
5772
      nic_override = dict(self.op.nics)
5773
      for idx, nic in enumerate(self.instance.nics):
5774
        if idx in nic_override:
5775
          this_nic_override = nic_override[idx]
5776
        else:
5777
          this_nic_override = {}
5778
        if 'ip' in this_nic_override:
5779
          ip = this_nic_override['ip']
5780
        else:
5781
          ip = nic.ip
5782
        if 'bridge' in this_nic_override:
5783
          bridge = this_nic_override['bridge']
5784
        else:
5785
          bridge = nic.bridge
5786
        if 'mac' in this_nic_override:
5787
          mac = this_nic_override['mac']
5788
        else:
5789
          mac = nic.mac
5790
        args['nics'].append((ip, bridge, mac))
5791
      if constants.DDM_ADD in nic_override:
5792
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5793
        bridge = nic_override[constants.DDM_ADD]['bridge']
5794
        mac = nic_override[constants.DDM_ADD]['mac']
5795
        args['nics'].append((ip, bridge, mac))
5796
      elif constants.DDM_REMOVE in nic_override:
5797
        del args['nics'][-1]
5798

    
5799
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5800
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5801
    return env, nl, nl
5802

    
5803
  def CheckPrereq(self):
5804
    """Check prerequisites.
5805

5806
    This only checks the instance list against the existing names.
5807

5808
    """
5809
    force = self.force = self.op.force
5810

    
5811
    # checking the new params on the primary/secondary nodes
5812

    
5813
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5814
    assert self.instance is not None, \
5815
      "Cannot retrieve locked instance %s" % self.op.instance_name
5816
    pnode = instance.primary_node
5817
    nodelist = list(instance.all_nodes)
5818

    
5819
    # hvparams processing
5820
    if self.op.hvparams:
5821
      i_hvdict = copy.deepcopy(instance.hvparams)
5822
      for key, val in self.op.hvparams.iteritems():
5823
        if val == constants.VALUE_DEFAULT:
5824
          try:
5825
            del i_hvdict[key]
5826
          except KeyError:
5827
            pass
5828
        else:
5829
          i_hvdict[key] = val
5830
      cluster = self.cfg.GetClusterInfo()
5831
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5832
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5833
                                i_hvdict)
5834
      # local check
5835
      hypervisor.GetHypervisor(
5836
        instance.hypervisor).CheckParameterSyntax(hv_new)
5837
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5838
      self.hv_new = hv_new # the new actual values
5839
      self.hv_inst = i_hvdict # the new dict (without defaults)
5840
    else:
5841
      self.hv_new = self.hv_inst = {}
5842

    
5843
    # beparams processing
5844
    if self.op.beparams:
5845
      i_bedict = copy.deepcopy(instance.beparams)
5846
      for key, val in self.op.beparams.iteritems():
5847
        if val == constants.VALUE_DEFAULT:
5848
          try:
5849
            del i_bedict[key]
5850
          except KeyError:
5851
            pass
5852
        else:
5853
          i_bedict[key] = val
5854
      cluster = self.cfg.GetClusterInfo()
5855
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5856
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5857
                                i_bedict)
5858
      self.be_new = be_new # the new actual values
5859
      self.be_inst = i_bedict # the new dict (without defaults)
5860
    else:
5861
      self.be_new = self.be_inst = {}
5862

    
5863
    self.warn = []
5864

    
5865
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5866
      mem_check_list = [pnode]
5867
      if be_new[constants.BE_AUTO_BALANCE]:
5868
        # either we changed auto_balance to yes or it was from before
5869
        mem_check_list.extend(instance.secondary_nodes)
5870
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5871
                                                  instance.hypervisor)
5872
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5873
                                         instance.hypervisor)
5874
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5875
        # Assume the primary node is unreachable and go ahead
5876
        self.warn.append("Can't get info from primary node %s" % pnode)
5877
      else:
5878
        if not instance_info.failed and instance_info.data:
5879
          current_mem = instance_info.data['memory']
5880
        else:
5881
          # Assume instance not running
5882
          # (there is a slight race condition here, but it's not very probable,
5883
          # and we have no other way to check)
5884
          current_mem = 0
5885
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5886
                    nodeinfo[pnode].data['memory_free'])
5887
        if miss_mem > 0:
5888
          raise errors.OpPrereqError("This change will prevent the instance"
5889
                                     " from starting, due to %d MB of memory"
5890
                                     " missing on its primary node" % miss_mem)
5891

    
5892
      if be_new[constants.BE_AUTO_BALANCE]:
5893
        for node, nres in nodeinfo.iteritems():
5894
          if node not in instance.secondary_nodes:
5895
            continue
5896
          if nres.failed or not isinstance(nres.data, dict):
5897
            self.warn.append("Can't get info from secondary node %s" % node)
5898
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5899
            self.warn.append("Not enough memory to failover instance to"
5900
                             " secondary node %s" % node)
5901

    
5902
    # NIC processing
5903
    for nic_op, nic_dict in self.op.nics:
5904
      if nic_op == constants.DDM_REMOVE:
5905
        if not instance.nics:
5906
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5907
        continue
5908
      if nic_op != constants.DDM_ADD:
5909
        # an existing nic
5910
        if nic_op < 0 or nic_op >= len(instance.nics):
5911
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5912
                                     " are 0 to %d" %
5913
                                     (nic_op, len(instance.nics)))
5914
      if 'bridge' in nic_dict:
5915
        nic_bridge = nic_dict['bridge']
5916
        if nic_bridge is None:
5917
          raise errors.OpPrereqError('Cannot set the nic bridge to None')
5918
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5919
          msg = ("Bridge '%s' doesn't exist on one of"
5920
                 " the instance nodes" % nic_bridge)
5921
          if self.force:
5922
            self.warn.append(msg)
5923
          else:
5924
            raise errors.OpPrereqError(msg)
5925
      if 'mac' in nic_dict:
5926
        nic_mac = nic_dict['mac']
5927
        if nic_mac is None:
5928
          raise errors.OpPrereqError('Cannot set the nic mac to None')
5929
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5930
          # otherwise generate the mac
5931
          nic_dict['mac'] = self.cfg.GenerateMAC()
5932
        else:
5933
          # or validate/reserve the current one
5934
          if self.cfg.IsMacInUse(nic_mac):
5935
            raise errors.OpPrereqError("MAC address %s already in use"
5936
                                       " in cluster" % nic_mac)
5937

    
5938
    # DISK processing
5939
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5940
      raise errors.OpPrereqError("Disk operations not supported for"
5941
                                 " diskless instances")
5942
    for disk_op, disk_dict in self.op.disks:
5943
      if disk_op == constants.DDM_REMOVE:
5944
        if len(instance.disks) == 1:
5945
          raise errors.OpPrereqError("Cannot remove the last disk of"
5946
                                     " an instance")
5947
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5948
        ins_l = ins_l[pnode]
5949
        if ins_l.failed or not isinstance(ins_l.data, list):
5950
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5951
        if instance.name in ins_l.data:
5952
          raise errors.OpPrereqError("Instance is running, can't remove"
5953
                                     " disks.")
5954

    
5955
      if (disk_op == constants.DDM_ADD and
5956
          len(instance.nics) >= constants.MAX_DISKS):
5957
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5958
                                   " add more" % constants.MAX_DISKS)
5959
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5960
        # an existing disk
5961
        if disk_op < 0 or disk_op >= len(instance.disks):
5962
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5963
                                     " are 0 to %d" %
5964
                                     (disk_op, len(instance.disks)))
5965

    
5966
    return
5967

    
5968
  def Exec(self, feedback_fn):
5969
    """Modifies an instance.
5970

5971
    All parameters take effect only at the next restart of the instance.
5972

5973
    """
5974
    # Process here the warnings from CheckPrereq, as we don't have a
5975
    # feedback_fn there.
5976
    for warn in self.warn:
5977
      feedback_fn("WARNING: %s" % warn)
5978

    
5979
    result = []
5980
    instance = self.instance
5981
    # disk changes
5982
    for disk_op, disk_dict in self.op.disks:
5983
      if disk_op == constants.DDM_REMOVE:
5984
        # remove the last disk
5985
        device = instance.disks.pop()
5986
        device_idx = len(instance.disks)
5987
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5988
          self.cfg.SetDiskID(disk, node)
5989
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5990
          if msg:
5991
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
5992
                            " continuing anyway", device_idx, node, msg)
5993
        result.append(("disk/%d" % device_idx, "remove"))
5994
      elif disk_op == constants.DDM_ADD:
5995
        # add a new disk
5996
        if instance.disk_template == constants.DT_FILE:
5997
          file_driver, file_path = instance.disks[0].logical_id
5998
          file_path = os.path.dirname(file_path)
5999
        else:
6000
          file_driver = file_path = None
6001
        disk_idx_base = len(instance.disks)
6002
        new_disk = _GenerateDiskTemplate(self,
6003
                                         instance.disk_template,
6004
                                         instance.name, instance.primary_node,
6005
                                         instance.secondary_nodes,
6006
                                         [disk_dict],
6007
                                         file_path,
6008
                                         file_driver,
6009
                                         disk_idx_base)[0]
6010
        instance.disks.append(new_disk)
6011
        info = _GetInstanceInfoText(instance)
6012

    
6013
        logging.info("Creating volume %s for instance %s",
6014
                     new_disk.iv_name, instance.name)
6015
        # Note: this needs to be kept in sync with _CreateDisks
6016
        #HARDCODE
6017
        for node in instance.all_nodes:
6018
          f_create = node == instance.primary_node
6019
          try:
6020
            _CreateBlockDev(self, node, instance, new_disk,
6021
                            f_create, info, f_create)
6022
          except errors.OpExecError, err:
6023
            self.LogWarning("Failed to create volume %s (%s) on"
6024
                            " node %s: %s",
6025
                            new_disk.iv_name, new_disk, node, err)
6026
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6027
                       (new_disk.size, new_disk.mode)))
6028
      else:
6029
        # change a given disk
6030
        instance.disks[disk_op].mode = disk_dict['mode']
6031
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6032
    # NIC changes
6033
    for nic_op, nic_dict in self.op.nics:
6034
      if nic_op == constants.DDM_REMOVE:
6035
        # remove the last nic
6036
        del instance.nics[-1]
6037
        result.append(("nic.%d" % len(instance.nics), "remove"))
6038
      elif nic_op == constants.DDM_ADD:
6039
        # mac and bridge should be set, by now
6040
        mac = nic_dict['mac']
6041
        bridge = nic_dict['bridge']
6042
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6043
                              bridge=bridge)
6044
        instance.nics.append(new_nic)
6045
        result.append(("nic.%d" % (len(instance.nics) - 1),
6046
                       "add:mac=%s,ip=%s,bridge=%s" %
6047
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
6048
      else:
6049
        # change a given nic
6050
        for key in 'mac', 'ip', 'bridge':
6051
          if key in nic_dict:
6052
            setattr(instance.nics[nic_op], key, nic_dict[key])
6053
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6054

    
6055
    # hvparams changes
6056
    if self.op.hvparams:
6057
      instance.hvparams = self.hv_inst
6058
      for key, val in self.op.hvparams.iteritems():
6059
        result.append(("hv/%s" % key, val))
6060

    
6061
    # beparams changes
6062
    if self.op.beparams:
6063
      instance.beparams = self.be_inst
6064
      for key, val in self.op.beparams.iteritems():
6065
        result.append(("be/%s" % key, val))
6066

    
6067
    self.cfg.Update(instance)
6068

    
6069
    return result
6070

    
6071

    
6072
class LUQueryExports(NoHooksLU):
6073
  """Query the exports list
6074

6075
  """
6076
  _OP_REQP = ['nodes']
6077
  REQ_BGL = False
6078

    
6079
  def ExpandNames(self):
6080
    self.needed_locks = {}
6081
    self.share_locks[locking.LEVEL_NODE] = 1
6082
    if not self.op.nodes:
6083
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6084
    else:
6085
      self.needed_locks[locking.LEVEL_NODE] = \
6086
        _GetWantedNodes(self, self.op.nodes)
6087

    
6088
  def CheckPrereq(self):
6089
    """Check prerequisites.
6090

6091
    """
6092
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6093

    
6094
  def Exec(self, feedback_fn):
6095
    """Compute the list of all the exported system images.
6096

6097
    @rtype: dict
6098
    @return: a dictionary with the structure node->(export-list)
6099
        where export-list is a list of the instances exported on
6100
        that node.
6101

6102
    """
6103
    rpcresult = self.rpc.call_export_list(self.nodes)
6104
    result = {}
6105
    for node in rpcresult:
6106
      if rpcresult[node].failed:
6107
        result[node] = False
6108
      else:
6109
        result[node] = rpcresult[node].data
6110

    
6111
    return result
6112

    
6113

    
6114
class LUExportInstance(LogicalUnit):
6115
  """Export an instance to an image in the cluster.
6116

6117
  """
6118
  HPATH = "instance-export"
6119
  HTYPE = constants.HTYPE_INSTANCE
6120
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6121
  REQ_BGL = False
6122

    
6123
  def ExpandNames(self):
6124
    self._ExpandAndLockInstance()
6125
    # FIXME: lock only instance primary and destination node
6126
    #
6127
    # Sad but true, for now we have do lock all nodes, as we don't know where
6128
    # the previous export might be, and and in this LU we search for it and
6129
    # remove it from its current node. In the future we could fix this by:
6130
    #  - making a tasklet to search (share-lock all), then create the new one,
6131
    #    then one to remove, after
6132
    #  - removing the removal operation altoghether
6133
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6134

    
6135
  def DeclareLocks(self, level):
6136
    """Last minute lock declaration."""
6137
    # All nodes are locked anyway, so nothing to do here.
6138

    
6139
  def BuildHooksEnv(self):
6140
    """Build hooks env.
6141

6142
    This will run on the master, primary node and target node.
6143

6144
    """
6145
    env = {
6146
      "EXPORT_NODE": self.op.target_node,
6147
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6148
      }
6149
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6150
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6151
          self.op.target_node]
6152
    return env, nl, nl
6153

    
6154
  def CheckPrereq(self):
6155
    """Check prerequisites.
6156

6157
    This checks that the instance and node names are valid.
6158

6159
    """
6160
    instance_name = self.op.instance_name
6161
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6162
    assert self.instance is not None, \
6163
          "Cannot retrieve locked instance %s" % self.op.instance_name
6164
    _CheckNodeOnline(self, self.instance.primary_node)
6165

    
6166
    self.dst_node = self.cfg.GetNodeInfo(
6167
      self.cfg.ExpandNodeName(self.op.target_node))
6168

    
6169
    if self.dst_node is None:
6170
      # This is wrong node name, not a non-locked node
6171
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6172
    _CheckNodeOnline(self, self.dst_node.name)
6173
    _CheckNodeNotDrained(self, self.dst_node.name)
6174

    
6175
    # instance disk type verification
6176
    for disk in self.instance.disks:
6177
      if disk.dev_type == constants.LD_FILE:
6178
        raise errors.OpPrereqError("Export not supported for instances with"
6179
                                   " file-based disks")
6180

    
6181
  def Exec(self, feedback_fn):
6182
    """Export an instance to an image in the cluster.
6183

6184
    """
6185
    instance = self.instance
6186
    dst_node = self.dst_node
6187
    src_node = instance.primary_node
6188
    if self.op.shutdown:
6189
      # shutdown the instance, but not the disks
6190
      result = self.rpc.call_instance_shutdown(src_node, instance)
6191
      msg = result.RemoteFailMsg()
6192
      if msg:
6193
        raise errors.OpExecError("Could not shutdown instance %s on"
6194
                                 " node %s: %s" %
6195
                                 (instance.name, src_node, msg))
6196

    
6197
    vgname = self.cfg.GetVGName()
6198

    
6199
    snap_disks = []
6200

    
6201
    # set the disks ID correctly since call_instance_start needs the
6202
    # correct drbd minor to create the symlinks
6203
    for disk in instance.disks:
6204
      self.cfg.SetDiskID(disk, src_node)
6205

    
6206
    try:
6207
      for disk in instance.disks:
6208
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6209
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6210
        if new_dev_name.failed or not new_dev_name.data:
6211
          self.LogWarning("Could not snapshot block device %s on node %s",
6212
                          disk.logical_id[1], src_node)
6213
          snap_disks.append(False)
6214
        else:
6215
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6216
                                 logical_id=(vgname, new_dev_name.data),
6217
                                 physical_id=(vgname, new_dev_name.data),
6218
                                 iv_name=disk.iv_name)
6219
          snap_disks.append(new_dev)
6220

    
6221
    finally:
6222
      if self.op.shutdown and instance.admin_up:
6223
        result = self.rpc.call_instance_start(src_node, instance)
6224
        msg = result.RemoteFailMsg()
6225
        if msg:
6226
          _ShutdownInstanceDisks(self, instance)
6227
          raise errors.OpExecError("Could not start instance: %s" % msg)
6228

    
6229
    # TODO: check for size
6230

    
6231
    cluster_name = self.cfg.GetClusterName()
6232
    for idx, dev in enumerate(snap_disks):
6233
      if dev:
6234
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6235
                                               instance, cluster_name, idx)
6236
        if result.failed or not result.data:
6237
          self.LogWarning("Could not export block device %s from node %s to"
6238
                          " node %s", dev.logical_id[1], src_node,
6239
                          dst_node.name)
6240
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6241
        if msg:
6242
          self.LogWarning("Could not remove snapshot block device %s from node"
6243
                          " %s: %s", dev.logical_id[1], src_node, msg)
6244

    
6245
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6246
    if result.failed or not result.data:
6247
      self.LogWarning("Could not finalize export for instance %s on node %s",
6248
                      instance.name, dst_node.name)
6249

    
6250
    nodelist = self.cfg.GetNodeList()
6251
    nodelist.remove(dst_node.name)
6252

    
6253
    # on one-node clusters nodelist will be empty after the removal
6254
    # if we proceed the backup would be removed because OpQueryExports
6255
    # substitutes an empty list with the full cluster node list.
6256
    if nodelist:
6257
      exportlist = self.rpc.call_export_list(nodelist)
6258
      for node in exportlist:
6259
        if exportlist[node].failed:
6260
          continue
6261
        if instance.name in exportlist[node].data:
6262
          if not self.rpc.call_export_remove(node, instance.name):
6263
            self.LogWarning("Could not remove older export for instance %s"
6264
                            " on node %s", instance.name, node)
6265

    
6266

    
6267
class LURemoveExport(NoHooksLU):
6268
  """Remove exports related to the named instance.
6269

6270
  """
6271
  _OP_REQP = ["instance_name"]
6272
  REQ_BGL = False
6273

    
6274
  def ExpandNames(self):
6275
    self.needed_locks = {}
6276
    # We need all nodes to be locked in order for RemoveExport to work, but we
6277
    # don't need to lock the instance itself, as nothing will happen to it (and
6278
    # we can remove exports also for a removed instance)
6279
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6280

    
6281
  def CheckPrereq(self):
6282
    """Check prerequisites.
6283
    """
6284
    pass
6285

    
6286
  def Exec(self, feedback_fn):
6287
    """Remove any export.
6288

6289
    """
6290
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6291
    # If the instance was not found we'll try with the name that was passed in.
6292
    # This will only work if it was an FQDN, though.
6293
    fqdn_warn = False
6294
    if not instance_name:
6295
      fqdn_warn = True
6296
      instance_name = self.op.instance_name
6297

    
6298
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6299
      locking.LEVEL_NODE])
6300
    found = False
6301
    for node in exportlist:
6302
      if exportlist[node].failed:
6303
        self.LogWarning("Failed to query node %s, continuing" % node)
6304
        continue
6305
      if instance_name in exportlist[node].data:
6306
        found = True
6307
        result = self.rpc.call_export_remove(node, instance_name)
6308
        if result.failed or not result.data:
6309
          logging.error("Could not remove export for instance %s"
6310
                        " on node %s", instance_name, node)
6311

    
6312
    if fqdn_warn and not found:
6313
      feedback_fn("Export not found. If trying to remove an export belonging"
6314
                  " to a deleted instance please use its Fully Qualified"
6315
                  " Domain Name.")
6316

    
6317

    
6318
class TagsLU(NoHooksLU):
6319
  """Generic tags LU.
6320

6321
  This is an abstract class which is the parent of all the other tags LUs.
6322

6323
  """
6324

    
6325
  def ExpandNames(self):
6326
    self.needed_locks = {}
6327
    if self.op.kind == constants.TAG_NODE:
6328
      name = self.cfg.ExpandNodeName(self.op.name)
6329
      if name is None:
6330
        raise errors.OpPrereqError("Invalid node name (%s)" %
6331
                                   (self.op.name,))
6332
      self.op.name = name
6333
      self.needed_locks[locking.LEVEL_NODE] = name
6334
    elif self.op.kind == constants.TAG_INSTANCE:
6335
      name = self.cfg.ExpandInstanceName(self.op.name)
6336
      if name is None:
6337
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6338
                                   (self.op.name,))
6339
      self.op.name = name
6340
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6341

    
6342
  def CheckPrereq(self):
6343
    """Check prerequisites.
6344

6345
    """
6346
    if self.op.kind == constants.TAG_CLUSTER:
6347
      self.target = self.cfg.GetClusterInfo()
6348
    elif self.op.kind == constants.TAG_NODE:
6349
      self.target = self.cfg.GetNodeInfo(self.op.name)
6350
    elif self.op.kind == constants.TAG_INSTANCE:
6351
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6352
    else:
6353
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6354
                                 str(self.op.kind))
6355

    
6356

    
6357
class LUGetTags(TagsLU):
6358
  """Returns the tags of a given object.
6359

6360
  """
6361
  _OP_REQP = ["kind", "name"]
6362
  REQ_BGL = False
6363

    
6364
  def Exec(self, feedback_fn):
6365
    """Returns the tag list.
6366

6367
    """
6368
    return list(self.target.GetTags())
6369

    
6370

    
6371
class LUSearchTags(NoHooksLU):
6372
  """Searches the tags for a given pattern.
6373

6374
  """
6375
  _OP_REQP = ["pattern"]
6376
  REQ_BGL = False
6377

    
6378
  def ExpandNames(self):
6379
    self.needed_locks = {}
6380

    
6381
  def CheckPrereq(self):
6382
    """Check prerequisites.
6383

6384
    This checks the pattern passed for validity by compiling it.
6385

6386
    """
6387
    try:
6388
      self.re = re.compile(self.op.pattern)
6389
    except re.error, err:
6390
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6391
                                 (self.op.pattern, err))
6392

    
6393
  def Exec(self, feedback_fn):
6394
    """Returns the tag list.
6395

6396
    """
6397
    cfg = self.cfg
6398
    tgts = [("/cluster", cfg.GetClusterInfo())]
6399
    ilist = cfg.GetAllInstancesInfo().values()
6400
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6401
    nlist = cfg.GetAllNodesInfo().values()
6402
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6403
    results = []
6404
    for path, target in tgts:
6405
      for tag in target.GetTags():
6406
        if self.re.search(tag):
6407
          results.append((path, tag))
6408
    return results
6409

    
6410

    
6411
class LUAddTags(TagsLU):
6412
  """Sets a tag on a given object.
6413

6414
  """
6415
  _OP_REQP = ["kind", "name", "tags"]
6416
  REQ_BGL = False
6417

    
6418
  def CheckPrereq(self):
6419
    """Check prerequisites.
6420

6421
    This checks the type and length of the tag name and value.
6422

6423
    """
6424
    TagsLU.CheckPrereq(self)
6425
    for tag in self.op.tags:
6426
      objects.TaggableObject.ValidateTag(tag)
6427

    
6428
  def Exec(self, feedback_fn):
6429
    """Sets the tag.
6430

6431
    """
6432
    try:
6433
      for tag in self.op.tags:
6434
        self.target.AddTag(tag)
6435
    except errors.TagError, err:
6436
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6437
    try:
6438
      self.cfg.Update(self.target)
6439
    except errors.ConfigurationError:
6440
      raise errors.OpRetryError("There has been a modification to the"
6441
                                " config file and the operation has been"
6442
                                " aborted. Please retry.")
6443

    
6444

    
6445
class LUDelTags(TagsLU):
6446
  """Delete a list of tags from a given object.
6447

6448
  """
6449
  _OP_REQP = ["kind", "name", "tags"]
6450
  REQ_BGL = False
6451

    
6452
  def CheckPrereq(self):
6453
    """Check prerequisites.
6454

6455
    This checks that we have the given tag.
6456

6457
    """
6458
    TagsLU.CheckPrereq(self)
6459
    for tag in self.op.tags:
6460
      objects.TaggableObject.ValidateTag(tag)
6461
    del_tags = frozenset(self.op.tags)
6462
    cur_tags = self.target.GetTags()
6463
    if not del_tags <= cur_tags:
6464
      diff_tags = del_tags - cur_tags
6465
      diff_names = ["'%s'" % tag for tag in diff_tags]
6466
      diff_names.sort()
6467
      raise errors.OpPrereqError("Tag(s) %s not found" %
6468
                                 (",".join(diff_names)))
6469

    
6470
  def Exec(self, feedback_fn):
6471
    """Remove the tag from the object.
6472

6473
    """
6474
    for tag in self.op.tags:
6475
      self.target.RemoveTag(tag)
6476
    try:
6477
      self.cfg.Update(self.target)
6478
    except errors.ConfigurationError:
6479
      raise errors.OpRetryError("There has been a modification to the"
6480
                                " config file and the operation has been"
6481
                                " aborted. Please retry.")
6482

    
6483

    
6484
class LUTestDelay(NoHooksLU):
6485
  """Sleep for a specified amount of time.
6486

6487
  This LU sleeps on the master and/or nodes for a specified amount of
6488
  time.
6489

6490
  """
6491
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6492
  REQ_BGL = False
6493

    
6494
  def ExpandNames(self):
6495
    """Expand names and set required locks.
6496

6497
    This expands the node list, if any.
6498

6499
    """
6500
    self.needed_locks = {}
6501
    if self.op.on_nodes:
6502
      # _GetWantedNodes can be used here, but is not always appropriate to use
6503
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6504
      # more information.
6505
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6506
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6507

    
6508
  def CheckPrereq(self):
6509
    """Check prerequisites.
6510

6511
    """
6512

    
6513
  def Exec(self, feedback_fn):
6514
    """Do the actual sleep.
6515

6516
    """
6517
    if self.op.on_master:
6518
      if not utils.TestDelay(self.op.duration):
6519
        raise errors.OpExecError("Error during master delay test")
6520
    if self.op.on_nodes:
6521
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6522
      if not result:
6523
        raise errors.OpExecError("Complete failure from rpc call")
6524
      for node, node_result in result.items():
6525
        node_result.Raise()
6526
        if not node_result.data:
6527
          raise errors.OpExecError("Failure during rpc call to node %s,"
6528
                                   " result: %s" % (node, node_result.data))
6529

    
6530

    
6531
class IAllocator(object):
6532
  """IAllocator framework.
6533

6534
  An IAllocator instance has three sets of attributes:
6535
    - cfg that is needed to query the cluster
6536
    - input data (all members of the _KEYS class attribute are required)
6537
    - four buffer attributes (in|out_data|text), that represent the
6538
      input (to the external script) in text and data structure format,
6539
      and the output from it, again in two formats
6540
    - the result variables from the script (success, info, nodes) for
6541
      easy usage
6542

6543
  """
6544
  _ALLO_KEYS = [
6545
    "mem_size", "disks", "disk_template",
6546
    "os", "tags", "nics", "vcpus", "hypervisor",
6547
    ]
6548
  _RELO_KEYS = [
6549
    "relocate_from",
6550
    ]
6551

    
6552
  def __init__(self, lu, mode, name, **kwargs):
6553
    self.lu = lu
6554
    # init buffer variables
6555
    self.in_text = self.out_text = self.in_data = self.out_data = None
6556
    # init all input fields so that pylint is happy
6557
    self.mode = mode
6558
    self.name = name
6559
    self.mem_size = self.disks = self.disk_template = None
6560
    self.os = self.tags = self.nics = self.vcpus = None
6561
    self.hypervisor = None
6562
    self.relocate_from = None
6563
    # computed fields
6564
    self.required_nodes = None
6565
    # init result fields
6566
    self.success = self.info = self.nodes = None
6567
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6568
      keyset = self._ALLO_KEYS
6569
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6570
      keyset = self._RELO_KEYS
6571
    else:
6572
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6573
                                   " IAllocator" % self.mode)
6574
    for key in kwargs:
6575
      if key not in keyset:
6576
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6577
                                     " IAllocator" % key)
6578
      setattr(self, key, kwargs[key])
6579
    for key in keyset:
6580
      if key not in kwargs:
6581
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6582
                                     " IAllocator" % key)
6583
    self._BuildInputData()
6584

    
6585
  def _ComputeClusterData(self):
6586
    """Compute the generic allocator input data.
6587

6588
    This is the data that is independent of the actual operation.
6589

6590
    """
6591
    cfg = self.lu.cfg
6592
    cluster_info = cfg.GetClusterInfo()
6593
    # cluster data
6594
    data = {
6595
      "version": constants.IALLOCATOR_VERSION,
6596
      "cluster_name": cfg.GetClusterName(),
6597
      "cluster_tags": list(cluster_info.GetTags()),
6598
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6599
      # we don't have job IDs
6600
      }
6601
    iinfo = cfg.GetAllInstancesInfo().values()
6602
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6603

    
6604
    # node data
6605
    node_results = {}
6606
    node_list = cfg.GetNodeList()
6607

    
6608
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6609
      hypervisor_name = self.hypervisor
6610
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6611
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6612

    
6613
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6614
                                           hypervisor_name)
6615
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6616
                       cluster_info.enabled_hypervisors)
6617
    for nname, nresult in node_data.items():
6618
      # first fill in static (config-based) values
6619
      ninfo = cfg.GetNodeInfo(nname)
6620
      pnr = {
6621
        "tags": list(ninfo.GetTags()),
6622
        "primary_ip": ninfo.primary_ip,
6623
        "secondary_ip": ninfo.secondary_ip,
6624
        "offline": ninfo.offline,
6625
        "drained": ninfo.drained,
6626
        "master_candidate": ninfo.master_candidate,
6627
        }
6628

    
6629
      if not ninfo.offline:
6630
        nresult.Raise()
6631
        if not isinstance(nresult.data, dict):
6632
          raise errors.OpExecError("Can't get data for node %s" % nname)
6633
        remote_info = nresult.data
6634
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6635
                     'vg_size', 'vg_free', 'cpu_total']:
6636
          if attr not in remote_info:
6637
            raise errors.OpExecError("Node '%s' didn't return attribute"
6638
                                     " '%s'" % (nname, attr))
6639
          try:
6640
            remote_info[attr] = int(remote_info[attr])
6641
          except ValueError, err:
6642
            raise errors.OpExecError("Node '%s' returned invalid value"
6643
                                     " for '%s': %s" % (nname, attr, err))
6644
        # compute memory used by primary instances
6645
        i_p_mem = i_p_up_mem = 0
6646
        for iinfo, beinfo in i_list:
6647
          if iinfo.primary_node == nname:
6648
            i_p_mem += beinfo[constants.BE_MEMORY]
6649
            if iinfo.name not in node_iinfo[nname].data:
6650
              i_used_mem = 0
6651
            else:
6652
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6653
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6654
            remote_info['memory_free'] -= max(0, i_mem_diff)
6655

    
6656
            if iinfo.admin_up:
6657
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6658

    
6659
        # compute memory used by instances
6660
        pnr_dyn = {
6661
          "total_memory": remote_info['memory_total'],
6662
          "reserved_memory": remote_info['memory_dom0'],
6663
          "free_memory": remote_info['memory_free'],
6664
          "total_disk": remote_info['vg_size'],
6665
          "free_disk": remote_info['vg_free'],
6666
          "total_cpus": remote_info['cpu_total'],
6667
          "i_pri_memory": i_p_mem,
6668
          "i_pri_up_memory": i_p_up_mem,
6669
          }
6670
        pnr.update(pnr_dyn)
6671

    
6672
      node_results[nname] = pnr
6673
    data["nodes"] = node_results
6674

    
6675
    # instance data
6676
    instance_data = {}
6677
    for iinfo, beinfo in i_list:
6678
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6679
                  for n in iinfo.nics]
6680
      pir = {
6681
        "tags": list(iinfo.GetTags()),
6682
        "admin_up": iinfo.admin_up,
6683
        "vcpus": beinfo[constants.BE_VCPUS],
6684
        "memory": beinfo[constants.BE_MEMORY],
6685
        "os": iinfo.os,
6686
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6687
        "nics": nic_data,
6688
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6689
        "disk_template": iinfo.disk_template,
6690
        "hypervisor": iinfo.hypervisor,
6691
        }
6692
      instance_data[iinfo.name] = pir
6693

    
6694
    data["instances"] = instance_data
6695

    
6696
    self.in_data = data
6697

    
6698
  def _AddNewInstance(self):
6699
    """Add new instance data to allocator structure.
6700

6701
    This in combination with _AllocatorGetClusterData will create the
6702
    correct structure needed as input for the allocator.
6703

6704
    The checks for the completeness of the opcode must have already been
6705
    done.
6706

6707
    """
6708
    data = self.in_data
6709

    
6710
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6711

    
6712
    if self.disk_template in constants.DTS_NET_MIRROR:
6713
      self.required_nodes = 2
6714
    else:
6715
      self.required_nodes = 1
6716
    request = {
6717
      "type": "allocate",
6718
      "name": self.name,
6719
      "disk_template": self.disk_template,
6720
      "tags": self.tags,
6721
      "os": self.os,
6722
      "vcpus": self.vcpus,
6723
      "memory": self.mem_size,
6724
      "disks": self.disks,
6725
      "disk_space_total": disk_space,
6726
      "nics": self.nics,
6727
      "required_nodes": self.required_nodes,
6728
      }
6729
    data["request"] = request
6730

    
6731
  def _AddRelocateInstance(self):
6732
    """Add relocate instance data to allocator structure.
6733

6734
    This in combination with _IAllocatorGetClusterData will create the
6735
    correct structure needed as input for the allocator.
6736

6737
    The checks for the completeness of the opcode must have already been
6738
    done.
6739

6740
    """
6741
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6742
    if instance is None:
6743
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6744
                                   " IAllocator" % self.name)
6745

    
6746
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6747
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6748

    
6749
    if len(instance.secondary_nodes) != 1:
6750
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6751

    
6752
    self.required_nodes = 1
6753
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6754
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6755

    
6756
    request = {
6757
      "type": "relocate",
6758
      "name": self.name,
6759
      "disk_space_total": disk_space,
6760
      "required_nodes": self.required_nodes,
6761
      "relocate_from": self.relocate_from,
6762
      }
6763
    self.in_data["request"] = request
6764

    
6765
  def _BuildInputData(self):
6766
    """Build input data structures.
6767

6768
    """
6769
    self._ComputeClusterData()
6770

    
6771
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6772
      self._AddNewInstance()
6773
    else:
6774
      self._AddRelocateInstance()
6775

    
6776
    self.in_text = serializer.Dump(self.in_data)
6777

    
6778
  def Run(self, name, validate=True, call_fn=None):
6779
    """Run an instance allocator and return the results.
6780

6781
    """
6782
    if call_fn is None:
6783
      call_fn = self.lu.rpc.call_iallocator_runner
6784
    data = self.in_text
6785

    
6786
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6787
    result.Raise()
6788

    
6789
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6790
      raise errors.OpExecError("Invalid result from master iallocator runner")
6791

    
6792
    rcode, stdout, stderr, fail = result.data
6793

    
6794
    if rcode == constants.IARUN_NOTFOUND:
6795
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6796
    elif rcode == constants.IARUN_FAILURE:
6797
      raise errors.OpExecError("Instance allocator call failed: %s,"
6798
                               " output: %s" % (fail, stdout+stderr))
6799
    self.out_text = stdout
6800
    if validate:
6801
      self._ValidateResult()
6802

    
6803
  def _ValidateResult(self):
6804
    """Process the allocator results.
6805

6806
    This will process and if successful save the result in
6807
    self.out_data and the other parameters.
6808

6809
    """
6810
    try:
6811
      rdict = serializer.Load(self.out_text)
6812
    except Exception, err:
6813
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6814

    
6815
    if not isinstance(rdict, dict):
6816
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6817

    
6818
    for key in "success", "info", "nodes":
6819
      if key not in rdict:
6820
        raise errors.OpExecError("Can't parse iallocator results:"
6821
                                 " missing key '%s'" % key)
6822
      setattr(self, key, rdict[key])
6823

    
6824
    if not isinstance(rdict["nodes"], list):
6825
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6826
                               " is not a list")
6827
    self.out_data = rdict
6828

    
6829

    
6830
class LUTestAllocator(NoHooksLU):
6831
  """Run allocator tests.
6832

6833
  This LU runs the allocator tests
6834

6835
  """
6836
  _OP_REQP = ["direction", "mode", "name"]
6837

    
6838
  def CheckPrereq(self):
6839
    """Check prerequisites.
6840

6841
    This checks the opcode parameters depending on the director and mode test.
6842

6843
    """
6844
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6845
      for attr in ["name", "mem_size", "disks", "disk_template",
6846
                   "os", "tags", "nics", "vcpus"]:
6847
        if not hasattr(self.op, attr):
6848
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6849
                                     attr)
6850
      iname = self.cfg.ExpandInstanceName(self.op.name)
6851
      if iname is not None:
6852
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6853
                                   iname)
6854
      if not isinstance(self.op.nics, list):
6855
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6856
      for row in self.op.nics:
6857
        if (not isinstance(row, dict) or
6858
            "mac" not in row or
6859
            "ip" not in row or
6860
            "bridge" not in row):
6861
          raise errors.OpPrereqError("Invalid contents of the"
6862
                                     " 'nics' parameter")
6863
      if not isinstance(self.op.disks, list):
6864
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6865
      for row in self.op.disks:
6866
        if (not isinstance(row, dict) or
6867
            "size" not in row or
6868
            not isinstance(row["size"], int) or
6869
            "mode" not in row or
6870
            row["mode"] not in ['r', 'w']):
6871
          raise errors.OpPrereqError("Invalid contents of the"
6872
                                     " 'disks' parameter")
6873
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6874
        self.op.hypervisor = self.cfg.GetHypervisorType()
6875
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6876
      if not hasattr(self.op, "name"):
6877
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6878
      fname = self.cfg.ExpandInstanceName(self.op.name)
6879
      if fname is None:
6880
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6881
                                   self.op.name)
6882
      self.op.name = fname
6883
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6884
    else:
6885
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6886
                                 self.op.mode)
6887

    
6888
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6889
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6890
        raise errors.OpPrereqError("Missing allocator name")
6891
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6892
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6893
                                 self.op.direction)
6894

    
6895
  def Exec(self, feedback_fn):
6896
    """Run the allocator test.
6897

6898
    """
6899
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6900
      ial = IAllocator(self,
6901
                       mode=self.op.mode,
6902
                       name=self.op.name,
6903
                       mem_size=self.op.mem_size,
6904
                       disks=self.op.disks,
6905
                       disk_template=self.op.disk_template,
6906
                       os=self.op.os,
6907
                       tags=self.op.tags,
6908
                       nics=self.op.nics,
6909
                       vcpus=self.op.vcpus,
6910
                       hypervisor=self.op.hypervisor,
6911
                       )
6912
    else:
6913
      ial = IAllocator(self,
6914
                       mode=self.op.mode,
6915
                       name=self.op.name,
6916
                       relocate_from=list(self.relocate_from),
6917
                       )
6918

    
6919
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6920
      result = ial.in_text
6921
    else:
6922
      ial.Run(self.op.allocator, validate=False)
6923
      result = ial.out_text
6924
    return result