Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 77031881

History | View | Annotate | Download (240.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import sha
29
import time
30
import tempfile
31
import re
32
import platform
33
import logging
34
import copy
35
import random
36

    
37
from ganeti import ssh
38
from ganeti import utils
39
from ganeti import errors
40
from ganeti import hypervisor
41
from ganeti import locking
42
from ganeti import constants
43
from ganeti import objects
44
from ganeti import opcodes
45
from ganeti import serializer
46
from ganeti import ssconf
47

    
48

    
49
class LogicalUnit(object):
50
  """Logical Unit base class.
51

52
  Subclasses must follow these rules:
53
    - implement ExpandNames
54
    - implement CheckPrereq
55
    - implement Exec
56
    - implement BuildHooksEnv
57
    - redefine HPATH and HTYPE
58
    - optionally redefine their run requirements:
59
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60

61
  Note that all commands require root permissions.
62

63
  """
64
  HPATH = None
65
  HTYPE = None
66
  _OP_REQP = []
67
  REQ_BGL = True
68

    
69
  def __init__(self, processor, op, context, rpc):
70
    """Constructor for LogicalUnit.
71

72
    This needs to be overriden in derived classes in order to check op
73
    validity.
74

75
    """
76
    self.proc = processor
77
    self.op = op
78
    self.cfg = context.cfg
79
    self.context = context
80
    self.rpc = rpc
81
    # Dicts used to declare locking needs to mcpu
82
    self.needed_locks = None
83
    self.acquired_locks = {}
84
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85
    self.add_locks = {}
86
    self.remove_locks = {}
87
    # Used to force good behavior when calling helper functions
88
    self.recalculate_locks = {}
89
    self.__ssh = None
90
    # logging
91
    self.LogWarning = processor.LogWarning
92
    self.LogInfo = processor.LogInfo
93

    
94
    for attr_name in self._OP_REQP:
95
      attr_val = getattr(op, attr_name, None)
96
      if attr_val is None:
97
        raise errors.OpPrereqError("Required parameter '%s' missing" %
98
                                   attr_name)
99
    self.CheckArguments()
100

    
101
  def __GetSSH(self):
102
    """Returns the SshRunner object
103

104
    """
105
    if not self.__ssh:
106
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
107
    return self.__ssh
108

    
109
  ssh = property(fget=__GetSSH)
110

    
111
  def CheckArguments(self):
112
    """Check syntactic validity for the opcode arguments.
113

114
    This method is for doing a simple syntactic check and ensure
115
    validity of opcode parameters, without any cluster-related
116
    checks. While the same can be accomplished in ExpandNames and/or
117
    CheckPrereq, doing these separate is better because:
118

119
      - ExpandNames is left as as purely a lock-related function
120
      - CheckPrereq is run after we have aquired locks (and possible
121
        waited for them)
122

123
    The function is allowed to change the self.op attribute so that
124
    later methods can no longer worry about missing parameters.
125

126
    """
127
    pass
128

    
129
  def ExpandNames(self):
130
    """Expand names for this LU.
131

132
    This method is called before starting to execute the opcode, and it should
133
    update all the parameters of the opcode to their canonical form (e.g. a
134
    short node name must be fully expanded after this method has successfully
135
    completed). This way locking, hooks, logging, ecc. can work correctly.
136

137
    LUs which implement this method must also populate the self.needed_locks
138
    member, as a dict with lock levels as keys, and a list of needed lock names
139
    as values. Rules:
140

141
      - use an empty dict if you don't need any lock
142
      - if you don't need any lock at a particular level omit that level
143
      - don't put anything for the BGL level
144
      - if you want all locks at a level use locking.ALL_SET as a value
145

146
    If you need to share locks (rather than acquire them exclusively) at one
147
    level you can modify self.share_locks, setting a true value (usually 1) for
148
    that level. By default locks are not shared.
149

150
    Examples::
151

152
      # Acquire all nodes and one instance
153
      self.needed_locks = {
154
        locking.LEVEL_NODE: locking.ALL_SET,
155
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
156
      }
157
      # Acquire just two nodes
158
      self.needed_locks = {
159
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
160
      }
161
      # Acquire no locks
162
      self.needed_locks = {} # No, you can't leave it to the default value None
163

164
    """
165
    # The implementation of this method is mandatory only if the new LU is
166
    # concurrent, so that old LUs don't need to be changed all at the same
167
    # time.
168
    if self.REQ_BGL:
169
      self.needed_locks = {} # Exclusive LUs don't need locks.
170
    else:
171
      raise NotImplementedError
172

    
173
  def DeclareLocks(self, level):
174
    """Declare LU locking needs for a level
175

176
    While most LUs can just declare their locking needs at ExpandNames time,
177
    sometimes there's the need to calculate some locks after having acquired
178
    the ones before. This function is called just before acquiring locks at a
179
    particular level, but after acquiring the ones at lower levels, and permits
180
    such calculations. It can be used to modify self.needed_locks, and by
181
    default it does nothing.
182

183
    This function is only called if you have something already set in
184
    self.needed_locks for the level.
185

186
    @param level: Locking level which is going to be locked
187
    @type level: member of ganeti.locking.LEVELS
188

189
    """
190

    
191
  def CheckPrereq(self):
192
    """Check prerequisites for this LU.
193

194
    This method should check that the prerequisites for the execution
195
    of this LU are fulfilled. It can do internode communication, but
196
    it should be idempotent - no cluster or system changes are
197
    allowed.
198

199
    The method should raise errors.OpPrereqError in case something is
200
    not fulfilled. Its return value is ignored.
201

202
    This method should also update all the parameters of the opcode to
203
    their canonical form if it hasn't been done by ExpandNames before.
204

205
    """
206
    raise NotImplementedError
207

    
208
  def Exec(self, feedback_fn):
209
    """Execute the LU.
210

211
    This method should implement the actual work. It should raise
212
    errors.OpExecError for failures that are somewhat dealt with in
213
    code, or expected.
214

215
    """
216
    raise NotImplementedError
217

    
218
  def BuildHooksEnv(self):
219
    """Build hooks environment for this LU.
220

221
    This method should return a three-node tuple consisting of: a dict
222
    containing the environment that will be used for running the
223
    specific hook for this LU, a list of node names on which the hook
224
    should run before the execution, and a list of node names on which
225
    the hook should run after the execution.
226

227
    The keys of the dict must not have 'GANETI_' prefixed as this will
228
    be handled in the hooks runner. Also note additional keys will be
229
    added by the hooks runner. If the LU doesn't define any
230
    environment, an empty dict (and not None) should be returned.
231

232
    No nodes should be returned as an empty list (and not None).
233

234
    Note that if the HPATH for a LU class is None, this function will
235
    not be called.
236

237
    """
238
    raise NotImplementedError
239

    
240
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
241
    """Notify the LU about the results of its hooks.
242

243
    This method is called every time a hooks phase is executed, and notifies
244
    the Logical Unit about the hooks' result. The LU can then use it to alter
245
    its result based on the hooks.  By default the method does nothing and the
246
    previous result is passed back unchanged but any LU can define it if it
247
    wants to use the local cluster hook-scripts somehow.
248

249
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
250
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
251
    @param hook_results: the results of the multi-node hooks rpc call
252
    @param feedback_fn: function used send feedback back to the caller
253
    @param lu_result: the previous Exec result this LU had, or None
254
        in the PRE phase
255
    @return: the new Exec result, based on the previous result
256
        and hook results
257

258
    """
259
    return lu_result
260

    
261
  def _ExpandAndLockInstance(self):
262
    """Helper function to expand and lock an instance.
263

264
    Many LUs that work on an instance take its name in self.op.instance_name
265
    and need to expand it and then declare the expanded name for locking. This
266
    function does it, and then updates self.op.instance_name to the expanded
267
    name. It also initializes needed_locks as a dict, if this hasn't been done
268
    before.
269

270
    """
271
    if self.needed_locks is None:
272
      self.needed_locks = {}
273
    else:
274
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
275
        "_ExpandAndLockInstance called with instance-level locks set"
276
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
277
    if expanded_name is None:
278
      raise errors.OpPrereqError("Instance '%s' not known" %
279
                                  self.op.instance_name)
280
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
281
    self.op.instance_name = expanded_name
282

    
283
  def _LockInstancesNodes(self, primary_only=False):
284
    """Helper function to declare instances' nodes for locking.
285

286
    This function should be called after locking one or more instances to lock
287
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
288
    with all primary or secondary nodes for instances already locked and
289
    present in self.needed_locks[locking.LEVEL_INSTANCE].
290

291
    It should be called from DeclareLocks, and for safety only works if
292
    self.recalculate_locks[locking.LEVEL_NODE] is set.
293

294
    In the future it may grow parameters to just lock some instance's nodes, or
295
    to just lock primaries or secondary nodes, if needed.
296

297
    If should be called in DeclareLocks in a way similar to::
298

299
      if level == locking.LEVEL_NODE:
300
        self._LockInstancesNodes()
301

302
    @type primary_only: boolean
303
    @param primary_only: only lock primary nodes of locked instances
304

305
    """
306
    assert locking.LEVEL_NODE in self.recalculate_locks, \
307
      "_LockInstancesNodes helper function called with no nodes to recalculate"
308

    
309
    # TODO: check if we're really been called with the instance locks held
310

    
311
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
312
    # future we might want to have different behaviors depending on the value
313
    # of self.recalculate_locks[locking.LEVEL_NODE]
314
    wanted_nodes = []
315
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
316
      instance = self.context.cfg.GetInstanceInfo(instance_name)
317
      wanted_nodes.append(instance.primary_node)
318
      if not primary_only:
319
        wanted_nodes.extend(instance.secondary_nodes)
320

    
321
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
322
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
323
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
324
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
325

    
326
    del self.recalculate_locks[locking.LEVEL_NODE]
327

    
328

    
329
class NoHooksLU(LogicalUnit):
330
  """Simple LU which runs no hooks.
331

332
  This LU is intended as a parent for other LogicalUnits which will
333
  run no hooks, in order to reduce duplicate code.
334

335
  """
336
  HPATH = None
337
  HTYPE = None
338

    
339

    
340
def _GetWantedNodes(lu, nodes):
341
  """Returns list of checked and expanded node names.
342

343
  @type lu: L{LogicalUnit}
344
  @param lu: the logical unit on whose behalf we execute
345
  @type nodes: list
346
  @param nodes: list of node names or None for all nodes
347
  @rtype: list
348
  @return: the list of nodes, sorted
349
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
350

351
  """
352
  if not isinstance(nodes, list):
353
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
354

    
355
  if not nodes:
356
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
357
      " non-empty list of nodes whose name is to be expanded.")
358

    
359
  wanted = []
360
  for name in nodes:
361
    node = lu.cfg.ExpandNodeName(name)
362
    if node is None:
363
      raise errors.OpPrereqError("No such node name '%s'" % name)
364
    wanted.append(node)
365

    
366
  return utils.NiceSort(wanted)
367

    
368

    
369
def _GetWantedInstances(lu, instances):
370
  """Returns list of checked and expanded instance names.
371

372
  @type lu: L{LogicalUnit}
373
  @param lu: the logical unit on whose behalf we execute
374
  @type instances: list
375
  @param instances: list of instance names or None for all instances
376
  @rtype: list
377
  @return: the list of instances, sorted
378
  @raise errors.OpPrereqError: if the instances parameter is wrong type
379
  @raise errors.OpPrereqError: if any of the passed instances is not found
380

381
  """
382
  if not isinstance(instances, list):
383
    raise errors.OpPrereqError("Invalid argument type 'instances'")
384

    
385
  if instances:
386
    wanted = []
387

    
388
    for name in instances:
389
      instance = lu.cfg.ExpandInstanceName(name)
390
      if instance is None:
391
        raise errors.OpPrereqError("No such instance name '%s'" % name)
392
      wanted.append(instance)
393

    
394
  else:
395
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
396
  return wanted
397

    
398

    
399
def _CheckOutputFields(static, dynamic, selected):
400
  """Checks whether all selected fields are valid.
401

402
  @type static: L{utils.FieldSet}
403
  @param static: static fields set
404
  @type dynamic: L{utils.FieldSet}
405
  @param dynamic: dynamic fields set
406

407
  """
408
  f = utils.FieldSet()
409
  f.Extend(static)
410
  f.Extend(dynamic)
411

    
412
  delta = f.NonMatching(selected)
413
  if delta:
414
    raise errors.OpPrereqError("Unknown output fields selected: %s"
415
                               % ",".join(delta))
416

    
417

    
418
def _CheckBooleanOpField(op, name):
419
  """Validates boolean opcode parameters.
420

421
  This will ensure that an opcode parameter is either a boolean value,
422
  or None (but that it always exists).
423

424
  """
425
  val = getattr(op, name, None)
426
  if not (val is None or isinstance(val, bool)):
427
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
428
                               (name, str(val)))
429
  setattr(op, name, val)
430

    
431

    
432
def _CheckNodeOnline(lu, node):
433
  """Ensure that a given node is online.
434

435
  @param lu: the LU on behalf of which we make the check
436
  @param node: the node to check
437
  @raise errors.OpPrereqError: if the node is offline
438

439
  """
440
  if lu.cfg.GetNodeInfo(node).offline:
441
    raise errors.OpPrereqError("Can't use offline node %s" % node)
442

    
443

    
444
def _CheckNodeNotDrained(lu, node):
445
  """Ensure that a given node is not drained.
446

447
  @param lu: the LU on behalf of which we make the check
448
  @param node: the node to check
449
  @raise errors.OpPrereqError: if the node is drained
450

451
  """
452
  if lu.cfg.GetNodeInfo(node).drained:
453
    raise errors.OpPrereqError("Can't use drained node %s" % node)
454

    
455

    
456
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
457
                          memory, vcpus, nics, disk_template, disks):
458
  """Builds instance related env variables for hooks
459

460
  This builds the hook environment from individual variables.
461

462
  @type name: string
463
  @param name: the name of the instance
464
  @type primary_node: string
465
  @param primary_node: the name of the instance's primary node
466
  @type secondary_nodes: list
467
  @param secondary_nodes: list of secondary nodes as strings
468
  @type os_type: string
469
  @param os_type: the name of the instance's OS
470
  @type status: boolean
471
  @param status: the should_run status of the instance
472
  @type memory: string
473
  @param memory: the memory size of the instance
474
  @type vcpus: string
475
  @param vcpus: the count of VCPUs the instance has
476
  @type nics: list
477
  @param nics: list of tuples (ip, bridge, mac) representing
478
      the NICs the instance  has
479
  @type disk_template: string
480
  @param disk_template: the distk template of the instance
481
  @type disks: list
482
  @param disks: the list of (size, mode) pairs
483
  @rtype: dict
484
  @return: the hook environment for this instance
485

486
  """
487
  if status:
488
    str_status = "up"
489
  else:
490
    str_status = "down"
491
  env = {
492
    "OP_TARGET": name,
493
    "INSTANCE_NAME": name,
494
    "INSTANCE_PRIMARY": primary_node,
495
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
496
    "INSTANCE_OS_TYPE": os_type,
497
    "INSTANCE_STATUS": str_status,
498
    "INSTANCE_MEMORY": memory,
499
    "INSTANCE_VCPUS": vcpus,
500
    "INSTANCE_DISK_TEMPLATE": disk_template,
501
  }
502

    
503
  if nics:
504
    nic_count = len(nics)
505
    for idx, (ip, bridge, mac) in enumerate(nics):
506
      if ip is None:
507
        ip = ""
508
      env["INSTANCE_NIC%d_IP" % idx] = ip
509
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
510
      env["INSTANCE_NIC%d_MAC" % idx] = mac
511
  else:
512
    nic_count = 0
513

    
514
  env["INSTANCE_NIC_COUNT"] = nic_count
515

    
516
  if disks:
517
    disk_count = len(disks)
518
    for idx, (size, mode) in enumerate(disks):
519
      env["INSTANCE_DISK%d_SIZE" % idx] = size
520
      env["INSTANCE_DISK%d_MODE" % idx] = mode
521
  else:
522
    disk_count = 0
523

    
524
  env["INSTANCE_DISK_COUNT"] = disk_count
525

    
526
  return env
527

    
528

    
529
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
530
  """Builds instance related env variables for hooks from an object.
531

532
  @type lu: L{LogicalUnit}
533
  @param lu: the logical unit on whose behalf we execute
534
  @type instance: L{objects.Instance}
535
  @param instance: the instance for which we should build the
536
      environment
537
  @type override: dict
538
  @param override: dictionary with key/values that will override
539
      our values
540
  @rtype: dict
541
  @return: the hook environment dictionary
542

543
  """
544
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
545
  args = {
546
    'name': instance.name,
547
    'primary_node': instance.primary_node,
548
    'secondary_nodes': instance.secondary_nodes,
549
    'os_type': instance.os,
550
    'status': instance.admin_up,
551
    'memory': bep[constants.BE_MEMORY],
552
    'vcpus': bep[constants.BE_VCPUS],
553
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
554
    'disk_template': instance.disk_template,
555
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
556
  }
557
  if override:
558
    args.update(override)
559
  return _BuildInstanceHookEnv(**args)
560

    
561

    
562
def _AdjustCandidatePool(lu):
563
  """Adjust the candidate pool after node operations.
564

565
  """
566
  mod_list = lu.cfg.MaintainCandidatePool()
567
  if mod_list:
568
    lu.LogInfo("Promoted nodes to master candidate role: %s",
569
               ", ".join(node.name for node in mod_list))
570
    for name in mod_list:
571
      lu.context.ReaddNode(name)
572
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
573
  if mc_now > mc_max:
574
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
575
               (mc_now, mc_max))
576

    
577

    
578
def _CheckInstanceBridgesExist(lu, instance):
579
  """Check that the brigdes needed by an instance exist.
580

581
  """
582
  # check bridges existance
583
  brlist = [nic.bridge for nic in instance.nics]
584
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
585
  result.Raise()
586
  if not result.data:
587
    raise errors.OpPrereqError("One or more target bridges %s does not"
588
                               " exist on destination node '%s'" %
589
                               (brlist, instance.primary_node))
590

    
591

    
592
class LUDestroyCluster(NoHooksLU):
593
  """Logical unit for destroying the cluster.
594

595
  """
596
  _OP_REQP = []
597

    
598
  def CheckPrereq(self):
599
    """Check prerequisites.
600

601
    This checks whether the cluster is empty.
602

603
    Any errors are signalled by raising errors.OpPrereqError.
604

605
    """
606
    master = self.cfg.GetMasterNode()
607

    
608
    nodelist = self.cfg.GetNodeList()
609
    if len(nodelist) != 1 or nodelist[0] != master:
610
      raise errors.OpPrereqError("There are still %d node(s) in"
611
                                 " this cluster." % (len(nodelist) - 1))
612
    instancelist = self.cfg.GetInstanceList()
613
    if instancelist:
614
      raise errors.OpPrereqError("There are still %d instance(s) in"
615
                                 " this cluster." % len(instancelist))
616

    
617
  def Exec(self, feedback_fn):
618
    """Destroys the cluster.
619

620
    """
621
    master = self.cfg.GetMasterNode()
622
    result = self.rpc.call_node_stop_master(master, False)
623
    result.Raise()
624
    if not result.data:
625
      raise errors.OpExecError("Could not disable the master role")
626
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
627
    utils.CreateBackup(priv_key)
628
    utils.CreateBackup(pub_key)
629
    return master
630

    
631

    
632
class LUVerifyCluster(LogicalUnit):
633
  """Verifies the cluster status.
634

635
  """
636
  HPATH = "cluster-verify"
637
  HTYPE = constants.HTYPE_CLUSTER
638
  _OP_REQP = ["skip_checks"]
639
  REQ_BGL = False
640

    
641
  def ExpandNames(self):
642
    self.needed_locks = {
643
      locking.LEVEL_NODE: locking.ALL_SET,
644
      locking.LEVEL_INSTANCE: locking.ALL_SET,
645
    }
646
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
647

    
648
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
649
                  node_result, feedback_fn, master_files,
650
                  drbd_map, vg_name):
651
    """Run multiple tests against a node.
652

653
    Test list:
654

655
      - compares ganeti version
656
      - checks vg existance and size > 20G
657
      - checks config file checksum
658
      - checks ssh to other nodes
659

660
    @type nodeinfo: L{objects.Node}
661
    @param nodeinfo: the node to check
662
    @param file_list: required list of files
663
    @param local_cksum: dictionary of local files and their checksums
664
    @param node_result: the results from the node
665
    @param feedback_fn: function used to accumulate results
666
    @param master_files: list of files that only masters should have
667
    @param drbd_map: the useddrbd minors for this node, in
668
        form of minor: (instance, must_exist) which correspond to instances
669
        and their running status
670
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
671

672
    """
673
    node = nodeinfo.name
674

    
675
    # main result, node_result should be a non-empty dict
676
    if not node_result or not isinstance(node_result, dict):
677
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
678
      return True
679

    
680
    # compares ganeti version
681
    local_version = constants.PROTOCOL_VERSION
682
    remote_version = node_result.get('version', None)
683
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
684
            len(remote_version) == 2):
685
      feedback_fn("  - ERROR: connection to %s failed" % (node))
686
      return True
687

    
688
    if local_version != remote_version[0]:
689
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
690
                  " node %s %s" % (local_version, node, remote_version[0]))
691
      return True
692

    
693
    # node seems compatible, we can actually try to look into its results
694

    
695
    bad = False
696

    
697
    # full package version
698
    if constants.RELEASE_VERSION != remote_version[1]:
699
      feedback_fn("  - WARNING: software version mismatch: master %s,"
700
                  " node %s %s" %
701
                  (constants.RELEASE_VERSION, node, remote_version[1]))
702

    
703
    # checks vg existence and size > 20G
704
    if vg_name is not None:
705
      vglist = node_result.get(constants.NV_VGLIST, None)
706
      if not vglist:
707
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
708
                        (node,))
709
        bad = True
710
      else:
711
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
712
                                              constants.MIN_VG_SIZE)
713
        if vgstatus:
714
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
715
          bad = True
716

    
717
    # checks config file checksum
718

    
719
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
720
    if not isinstance(remote_cksum, dict):
721
      bad = True
722
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
723
    else:
724
      for file_name in file_list:
725
        node_is_mc = nodeinfo.master_candidate
726
        must_have_file = file_name not in master_files
727
        if file_name not in remote_cksum:
728
          if node_is_mc or must_have_file:
729
            bad = True
730
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
731
        elif remote_cksum[file_name] != local_cksum[file_name]:
732
          if node_is_mc or must_have_file:
733
            bad = True
734
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
735
          else:
736
            # not candidate and this is not a must-have file
737
            bad = True
738
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
739
                        " '%s'" % file_name)
740
        else:
741
          # all good, except non-master/non-must have combination
742
          if not node_is_mc and not must_have_file:
743
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
744
                        " candidates" % file_name)
745

    
746
    # checks ssh to any
747

    
748
    if constants.NV_NODELIST not in node_result:
749
      bad = True
750
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
751
    else:
752
      if node_result[constants.NV_NODELIST]:
753
        bad = True
754
        for node in node_result[constants.NV_NODELIST]:
755
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
756
                          (node, node_result[constants.NV_NODELIST][node]))
757

    
758
    if constants.NV_NODENETTEST not in node_result:
759
      bad = True
760
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
761
    else:
762
      if node_result[constants.NV_NODENETTEST]:
763
        bad = True
764
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
765
        for node in nlist:
766
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
767
                          (node, node_result[constants.NV_NODENETTEST][node]))
768

    
769
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
770
    if isinstance(hyp_result, dict):
771
      for hv_name, hv_result in hyp_result.iteritems():
772
        if hv_result is not None:
773
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
774
                      (hv_name, hv_result))
775

    
776
    # check used drbd list
777
    if vg_name is not None:
778
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
779
      if not isinstance(used_minors, (tuple, list)):
780
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
781
                    str(used_minors))
782
      else:
783
        for minor, (iname, must_exist) in drbd_map.items():
784
          if minor not in used_minors and must_exist:
785
            feedback_fn("  - ERROR: drbd minor %d of instance %s is not active" %
786
                        (minor, iname))
787
            bad = True
788
        for minor in used_minors:
789
          if minor not in drbd_map:
790
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" % minor)
791
            bad = True
792

    
793
    return bad
794

    
795
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796
                      node_instance, feedback_fn, n_offline):
797
    """Verify an instance.
798

799
    This function checks to see if the required block devices are
800
    available on the instance's node.
801

802
    """
803
    bad = False
804

    
805
    node_current = instanceconfig.primary_node
806

    
807
    node_vol_should = {}
808
    instanceconfig.MapLVsByNode(node_vol_should)
809

    
810
    for node in node_vol_should:
811
      if node in n_offline:
812
        # ignore missing volumes on offline nodes
813
        continue
814
      for volume in node_vol_should[node]:
815
        if node not in node_vol_is or volume not in node_vol_is[node]:
816
          feedback_fn("  - ERROR: volume %s missing on node %s" %
817
                          (volume, node))
818
          bad = True
819

    
820
    if instanceconfig.admin_up:
821
      if ((node_current not in node_instance or
822
          not instance in node_instance[node_current]) and
823
          node_current not in n_offline):
824
        feedback_fn("  - ERROR: instance %s not running on node %s" %
825
                        (instance, node_current))
826
        bad = True
827

    
828
    for node in node_instance:
829
      if (not node == node_current):
830
        if instance in node_instance[node]:
831
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
832
                          (instance, node))
833
          bad = True
834

    
835
    return bad
836

    
837
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838
    """Verify if there are any unknown volumes in the cluster.
839

840
    The .os, .swap and backup volumes are ignored. All other volumes are
841
    reported as unknown.
842

843
    """
844
    bad = False
845

    
846
    for node in node_vol_is:
847
      for volume in node_vol_is[node]:
848
        if node not in node_vol_should or volume not in node_vol_should[node]:
849
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850
                      (volume, node))
851
          bad = True
852
    return bad
853

    
854
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855
    """Verify the list of running instances.
856

857
    This checks what instances are running but unknown to the cluster.
858

859
    """
860
    bad = False
861
    for node in node_instance:
862
      for runninginstance in node_instance[node]:
863
        if runninginstance not in instancelist:
864
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865
                          (runninginstance, node))
866
          bad = True
867
    return bad
868

    
869
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870
    """Verify N+1 Memory Resilience.
871

872
    Check that if one single node dies we can still start all the instances it
873
    was primary for.
874

875
    """
876
    bad = False
877

    
878
    for node, nodeinfo in node_info.iteritems():
879
      # This code checks that every node which is now listed as secondary has
880
      # enough memory to host all instances it is supposed to should a single
881
      # other node in the cluster fail.
882
      # FIXME: not ready for failover to an arbitrary node
883
      # FIXME: does not support file-backed instances
884
      # WARNING: we currently take into account down instances as well as up
885
      # ones, considering that even if they're down someone might want to start
886
      # them even in the event of a node failure.
887
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888
        needed_mem = 0
889
        for instance in instances:
890
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891
          if bep[constants.BE_AUTO_BALANCE]:
892
            needed_mem += bep[constants.BE_MEMORY]
893
        if nodeinfo['mfree'] < needed_mem:
894
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895
                      " failovers should node %s fail" % (node, prinode))
896
          bad = True
897
    return bad
898

    
899
  def CheckPrereq(self):
900
    """Check prerequisites.
901

902
    Transform the list of checks we're going to skip into a set and check that
903
    all its members are valid.
904

905
    """
906
    self.skip_set = frozenset(self.op.skip_checks)
907
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
909

    
910
  def BuildHooksEnv(self):
911
    """Build hooks env.
912

913
    Cluster-Verify hooks just rone in the post phase and their failure makes
914
    the output be logged in the verify output and the verification to fail.
915

916
    """
917
    all_nodes = self.cfg.GetNodeList()
918
    # TODO: populate the environment with useful information for verify hooks
919
    env = {}
920
    return env, [], all_nodes
921

    
922
  def Exec(self, feedback_fn):
923
    """Verify integrity of cluster, performing various test on nodes.
924

925
    """
926
    bad = False
927
    feedback_fn("* Verifying global settings")
928
    for msg in self.cfg.VerifyConfig():
929
      feedback_fn("  - ERROR: %s" % msg)
930

    
931
    vg_name = self.cfg.GetVGName()
932
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
933
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
934
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
935
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
936
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
937
                        for iname in instancelist)
938
    i_non_redundant = [] # Non redundant instances
939
    i_non_a_balanced = [] # Non auto-balanced instances
940
    n_offline = [] # List of offline nodes
941
    n_drained = [] # List of nodes being drained
942
    node_volume = {}
943
    node_instance = {}
944
    node_info = {}
945
    instance_cfg = {}
946

    
947
    # FIXME: verify OS list
948
    # do local checksums
949
    master_files = [constants.CLUSTER_CONF_FILE]
950

    
951
    file_names = ssconf.SimpleStore().GetFileList()
952
    file_names.append(constants.SSL_CERT_FILE)
953
    file_names.append(constants.RAPI_CERT_FILE)
954
    file_names.extend(master_files)
955

    
956
    local_checksums = utils.FingerprintFiles(file_names)
957

    
958
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
959
    node_verify_param = {
960
      constants.NV_FILELIST: file_names,
961
      constants.NV_NODELIST: [node.name for node in nodeinfo
962
                              if not node.offline],
963
      constants.NV_HYPERVISOR: hypervisors,
964
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
965
                                  node.secondary_ip) for node in nodeinfo
966
                                 if not node.offline],
967
      constants.NV_INSTANCELIST: hypervisors,
968
      constants.NV_VERSION: None,
969
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
970
      }
971
    if vg_name is not None:
972
      node_verify_param[constants.NV_VGLIST] = None
973
      node_verify_param[constants.NV_LVLIST] = vg_name
974
      node_verify_param[constants.NV_DRBDLIST] = None
975
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
976
                                           self.cfg.GetClusterName())
977

    
978
    cluster = self.cfg.GetClusterInfo()
979
    master_node = self.cfg.GetMasterNode()
980
    all_drbd_map = self.cfg.ComputeDRBDMap()
981

    
982
    for node_i in nodeinfo:
983
      node = node_i.name
984
      nresult = all_nvinfo[node].data
985

    
986
      if node_i.offline:
987
        feedback_fn("* Skipping offline node %s" % (node,))
988
        n_offline.append(node)
989
        continue
990

    
991
      if node == master_node:
992
        ntype = "master"
993
      elif node_i.master_candidate:
994
        ntype = "master candidate"
995
      elif node_i.drained:
996
        ntype = "drained"
997
        n_drained.append(node)
998
      else:
999
        ntype = "regular"
1000
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1001

    
1002
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1003
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1004
        bad = True
1005
        continue
1006

    
1007
      node_drbd = {}
1008
      for minor, instance in all_drbd_map[node].items():
1009
        instance = instanceinfo[instance]
1010
        node_drbd[minor] = (instance.name, instance.admin_up)
1011
      result = self._VerifyNode(node_i, file_names, local_checksums,
1012
                                nresult, feedback_fn, master_files,
1013
                                node_drbd, vg_name)
1014
      bad = bad or result
1015

    
1016
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1017
      if vg_name is None:
1018
        node_volume[node] = {}
1019
      elif isinstance(lvdata, basestring):
1020
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1021
                    (node, utils.SafeEncode(lvdata)))
1022
        bad = True
1023
        node_volume[node] = {}
1024
      elif not isinstance(lvdata, dict):
1025
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1026
        bad = True
1027
        continue
1028
      else:
1029
        node_volume[node] = lvdata
1030

    
1031
      # node_instance
1032
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1033
      if not isinstance(idata, list):
1034
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1035
                    (node,))
1036
        bad = True
1037
        continue
1038

    
1039
      node_instance[node] = idata
1040

    
1041
      # node_info
1042
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1043
      if not isinstance(nodeinfo, dict):
1044
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1045
        bad = True
1046
        continue
1047

    
1048
      try:
1049
        node_info[node] = {
1050
          "mfree": int(nodeinfo['memory_free']),
1051
          "pinst": [],
1052
          "sinst": [],
1053
          # dictionary holding all instances this node is secondary for,
1054
          # grouped by their primary node. Each key is a cluster node, and each
1055
          # value is a list of instances which have the key as primary and the
1056
          # current node as secondary.  this is handy to calculate N+1 memory
1057
          # availability if you can only failover from a primary to its
1058
          # secondary.
1059
          "sinst-by-pnode": {},
1060
        }
1061
        # FIXME: devise a free space model for file based instances as well
1062
        if vg_name is not None:
1063
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1064
      except ValueError:
1065
        feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
1066
        bad = True
1067
        continue
1068

    
1069
    node_vol_should = {}
1070

    
1071
    for instance in instancelist:
1072
      feedback_fn("* Verifying instance %s" % instance)
1073
      inst_config = instanceinfo[instance]
1074
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1075
                                     node_instance, feedback_fn, n_offline)
1076
      bad = bad or result
1077
      inst_nodes_offline = []
1078

    
1079
      inst_config.MapLVsByNode(node_vol_should)
1080

    
1081
      instance_cfg[instance] = inst_config
1082

    
1083
      pnode = inst_config.primary_node
1084
      if pnode in node_info:
1085
        node_info[pnode]['pinst'].append(instance)
1086
      elif pnode not in n_offline:
1087
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1088
                    " %s failed" % (instance, pnode))
1089
        bad = True
1090

    
1091
      if pnode in n_offline:
1092
        inst_nodes_offline.append(pnode)
1093

    
1094
      # If the instance is non-redundant we cannot survive losing its primary
1095
      # node, so we are not N+1 compliant. On the other hand we have no disk
1096
      # templates with more than one secondary so that situation is not well
1097
      # supported either.
1098
      # FIXME: does not support file-backed instances
1099
      if len(inst_config.secondary_nodes) == 0:
1100
        i_non_redundant.append(instance)
1101
      elif len(inst_config.secondary_nodes) > 1:
1102
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1103
                    % instance)
1104

    
1105
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1106
        i_non_a_balanced.append(instance)
1107

    
1108
      for snode in inst_config.secondary_nodes:
1109
        if snode in node_info:
1110
          node_info[snode]['sinst'].append(instance)
1111
          if pnode not in node_info[snode]['sinst-by-pnode']:
1112
            node_info[snode]['sinst-by-pnode'][pnode] = []
1113
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1114
        elif snode not in n_offline:
1115
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1116
                      " %s failed" % (instance, snode))
1117
          bad = True
1118
        if snode in n_offline:
1119
          inst_nodes_offline.append(snode)
1120

    
1121
      if inst_nodes_offline:
1122
        # warn that the instance lives on offline nodes, and set bad=True
1123
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1124
                    ", ".join(inst_nodes_offline))
1125
        bad = True
1126

    
1127
    feedback_fn("* Verifying orphan volumes")
1128
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1129
                                       feedback_fn)
1130
    bad = bad or result
1131

    
1132
    feedback_fn("* Verifying remaining instances")
1133
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1134
                                         feedback_fn)
1135
    bad = bad or result
1136

    
1137
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1138
      feedback_fn("* Verifying N+1 Memory redundancy")
1139
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1140
      bad = bad or result
1141

    
1142
    feedback_fn("* Other Notes")
1143
    if i_non_redundant:
1144
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1145
                  % len(i_non_redundant))
1146

    
1147
    if i_non_a_balanced:
1148
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1149
                  % len(i_non_a_balanced))
1150

    
1151
    if n_offline:
1152
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1153

    
1154
    if n_drained:
1155
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1156

    
1157
    return not bad
1158

    
1159
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1160
    """Analize the post-hooks' result
1161

1162
    This method analyses the hook result, handles it, and sends some
1163
    nicely-formatted feedback back to the user.
1164

1165
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1166
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1167
    @param hooks_results: the results of the multi-node hooks rpc call
1168
    @param feedback_fn: function used send feedback back to the caller
1169
    @param lu_result: previous Exec result
1170
    @return: the new Exec result, based on the previous result
1171
        and hook results
1172

1173
    """
1174
    # We only really run POST phase hooks, and are only interested in
1175
    # their results
1176
    if phase == constants.HOOKS_PHASE_POST:
1177
      # Used to change hooks' output to proper indentation
1178
      indent_re = re.compile('^', re.M)
1179
      feedback_fn("* Hooks Results")
1180
      if not hooks_results:
1181
        feedback_fn("  - ERROR: general communication failure")
1182
        lu_result = 1
1183
      else:
1184
        for node_name in hooks_results:
1185
          show_node_header = True
1186
          res = hooks_results[node_name]
1187
          if res.failed or res.data is False or not isinstance(res.data, list):
1188
            if res.offline:
1189
              # no need to warn or set fail return value
1190
              continue
1191
            feedback_fn("    Communication failure in hooks execution")
1192
            lu_result = 1
1193
            continue
1194
          for script, hkr, output in res.data:
1195
            if hkr == constants.HKR_FAIL:
1196
              # The node header is only shown once, if there are
1197
              # failing hooks on that node
1198
              if show_node_header:
1199
                feedback_fn("  Node %s:" % node_name)
1200
                show_node_header = False
1201
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1202
              output = indent_re.sub('      ', output)
1203
              feedback_fn("%s" % output)
1204
              lu_result = 1
1205

    
1206
      return lu_result
1207

    
1208

    
1209
class LUVerifyDisks(NoHooksLU):
1210
  """Verifies the cluster disks status.
1211

1212
  """
1213
  _OP_REQP = []
1214
  REQ_BGL = False
1215

    
1216
  def ExpandNames(self):
1217
    self.needed_locks = {
1218
      locking.LEVEL_NODE: locking.ALL_SET,
1219
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1220
    }
1221
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1222

    
1223
  def CheckPrereq(self):
1224
    """Check prerequisites.
1225

1226
    This has no prerequisites.
1227

1228
    """
1229
    pass
1230

    
1231
  def Exec(self, feedback_fn):
1232
    """Verify integrity of cluster disks.
1233

1234
    """
1235
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1236

    
1237
    vg_name = self.cfg.GetVGName()
1238
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1239
    instances = [self.cfg.GetInstanceInfo(name)
1240
                 for name in self.cfg.GetInstanceList()]
1241

    
1242
    nv_dict = {}
1243
    for inst in instances:
1244
      inst_lvs = {}
1245
      if (not inst.admin_up or
1246
          inst.disk_template not in constants.DTS_NET_MIRROR):
1247
        continue
1248
      inst.MapLVsByNode(inst_lvs)
1249
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1250
      for node, vol_list in inst_lvs.iteritems():
1251
        for vol in vol_list:
1252
          nv_dict[(node, vol)] = inst
1253

    
1254
    if not nv_dict:
1255
      return result
1256

    
1257
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1258

    
1259
    to_act = set()
1260
    for node in nodes:
1261
      # node_volume
1262
      lvs = node_lvs[node]
1263
      if lvs.failed:
1264
        if not lvs.offline:
1265
          self.LogWarning("Connection to node %s failed: %s" %
1266
                          (node, lvs.data))
1267
        continue
1268
      lvs = lvs.data
1269
      if isinstance(lvs, basestring):
1270
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1271
        res_nlvm[node] = lvs
1272
      elif not isinstance(lvs, dict):
1273
        logging.warning("Connection to node %s failed or invalid data"
1274
                        " returned", node)
1275
        res_nodes.append(node)
1276
        continue
1277

    
1278
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1279
        inst = nv_dict.pop((node, lv_name), None)
1280
        if (not lv_online and inst is not None
1281
            and inst.name not in res_instances):
1282
          res_instances.append(inst.name)
1283

    
1284
    # any leftover items in nv_dict are missing LVs, let's arrange the
1285
    # data better
1286
    for key, inst in nv_dict.iteritems():
1287
      if inst.name not in res_missing:
1288
        res_missing[inst.name] = []
1289
      res_missing[inst.name].append(key)
1290

    
1291
    return result
1292

    
1293

    
1294
class LURenameCluster(LogicalUnit):
1295
  """Rename the cluster.
1296

1297
  """
1298
  HPATH = "cluster-rename"
1299
  HTYPE = constants.HTYPE_CLUSTER
1300
  _OP_REQP = ["name"]
1301

    
1302
  def BuildHooksEnv(self):
1303
    """Build hooks env.
1304

1305
    """
1306
    env = {
1307
      "OP_TARGET": self.cfg.GetClusterName(),
1308
      "NEW_NAME": self.op.name,
1309
      }
1310
    mn = self.cfg.GetMasterNode()
1311
    return env, [mn], [mn]
1312

    
1313
  def CheckPrereq(self):
1314
    """Verify that the passed name is a valid one.
1315

1316
    """
1317
    hostname = utils.HostInfo(self.op.name)
1318

    
1319
    new_name = hostname.name
1320
    self.ip = new_ip = hostname.ip
1321
    old_name = self.cfg.GetClusterName()
1322
    old_ip = self.cfg.GetMasterIP()
1323
    if new_name == old_name and new_ip == old_ip:
1324
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1325
                                 " cluster has changed")
1326
    if new_ip != old_ip:
1327
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1328
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1329
                                   " reachable on the network. Aborting." %
1330
                                   new_ip)
1331

    
1332
    self.op.name = new_name
1333

    
1334
  def Exec(self, feedback_fn):
1335
    """Rename the cluster.
1336

1337
    """
1338
    clustername = self.op.name
1339
    ip = self.ip
1340

    
1341
    # shutdown the master IP
1342
    master = self.cfg.GetMasterNode()
1343
    result = self.rpc.call_node_stop_master(master, False)
1344
    if result.failed or not result.data:
1345
      raise errors.OpExecError("Could not disable the master role")
1346

    
1347
    try:
1348
      cluster = self.cfg.GetClusterInfo()
1349
      cluster.cluster_name = clustername
1350
      cluster.master_ip = ip
1351
      self.cfg.Update(cluster)
1352

    
1353
      # update the known hosts file
1354
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1355
      node_list = self.cfg.GetNodeList()
1356
      try:
1357
        node_list.remove(master)
1358
      except ValueError:
1359
        pass
1360
      result = self.rpc.call_upload_file(node_list,
1361
                                         constants.SSH_KNOWN_HOSTS_FILE)
1362
      for to_node, to_result in result.iteritems():
1363
        if to_result.failed or not to_result.data:
1364
          logging.error("Copy of file %s to node %s failed",
1365
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1366

    
1367
    finally:
1368
      result = self.rpc.call_node_start_master(master, False)
1369
      if result.failed or not result.data:
1370
        self.LogWarning("Could not re-enable the master role on"
1371
                        " the master, please restart manually.")
1372

    
1373

    
1374
def _RecursiveCheckIfLVMBased(disk):
1375
  """Check if the given disk or its children are lvm-based.
1376

1377
  @type disk: L{objects.Disk}
1378
  @param disk: the disk to check
1379
  @rtype: booleean
1380
  @return: boolean indicating whether a LD_LV dev_type was found or not
1381

1382
  """
1383
  if disk.children:
1384
    for chdisk in disk.children:
1385
      if _RecursiveCheckIfLVMBased(chdisk):
1386
        return True
1387
  return disk.dev_type == constants.LD_LV
1388

    
1389

    
1390
class LUSetClusterParams(LogicalUnit):
1391
  """Change the parameters of the cluster.
1392

1393
  """
1394
  HPATH = "cluster-modify"
1395
  HTYPE = constants.HTYPE_CLUSTER
1396
  _OP_REQP = []
1397
  REQ_BGL = False
1398

    
1399
  def CheckParameters(self):
1400
    """Check parameters
1401

1402
    """
1403
    if not hasattr(self.op, "candidate_pool_size"):
1404
      self.op.candidate_pool_size = None
1405
    if self.op.candidate_pool_size is not None:
1406
      try:
1407
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1408
      except ValueError, err:
1409
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1410
                                   str(err))
1411
      if self.op.candidate_pool_size < 1:
1412
        raise errors.OpPrereqError("At least one master candidate needed")
1413

    
1414
  def ExpandNames(self):
1415
    # FIXME: in the future maybe other cluster params won't require checking on
1416
    # all nodes to be modified.
1417
    self.needed_locks = {
1418
      locking.LEVEL_NODE: locking.ALL_SET,
1419
    }
1420
    self.share_locks[locking.LEVEL_NODE] = 1
1421

    
1422
  def BuildHooksEnv(self):
1423
    """Build hooks env.
1424

1425
    """
1426
    env = {
1427
      "OP_TARGET": self.cfg.GetClusterName(),
1428
      "NEW_VG_NAME": self.op.vg_name,
1429
      }
1430
    mn = self.cfg.GetMasterNode()
1431
    return env, [mn], [mn]
1432

    
1433
  def CheckPrereq(self):
1434
    """Check prerequisites.
1435

1436
    This checks whether the given params don't conflict and
1437
    if the given volume group is valid.
1438

1439
    """
1440
    if self.op.vg_name is not None and not self.op.vg_name:
1441
      instances = self.cfg.GetAllInstancesInfo().values()
1442
      for inst in instances:
1443
        for disk in inst.disks:
1444
          if _RecursiveCheckIfLVMBased(disk):
1445
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1446
                                       " lvm-based instances exist")
1447

    
1448
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1449

    
1450
    # if vg_name not None, checks given volume group on all nodes
1451
    if self.op.vg_name:
1452
      vglist = self.rpc.call_vg_list(node_list)
1453
      for node in node_list:
1454
        if vglist[node].failed:
1455
          # ignoring down node
1456
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1457
          continue
1458
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1459
                                              self.op.vg_name,
1460
                                              constants.MIN_VG_SIZE)
1461
        if vgstatus:
1462
          raise errors.OpPrereqError("Error on node '%s': %s" %
1463
                                     (node, vgstatus))
1464

    
1465
    self.cluster = cluster = self.cfg.GetClusterInfo()
1466
    # validate beparams changes
1467
    if self.op.beparams:
1468
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1469
      self.new_beparams = cluster.FillDict(
1470
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1471

    
1472
    # hypervisor list/parameters
1473
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1474
    if self.op.hvparams:
1475
      if not isinstance(self.op.hvparams, dict):
1476
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1477
      for hv_name, hv_dict in self.op.hvparams.items():
1478
        if hv_name not in self.new_hvparams:
1479
          self.new_hvparams[hv_name] = hv_dict
1480
        else:
1481
          self.new_hvparams[hv_name].update(hv_dict)
1482

    
1483
    if self.op.enabled_hypervisors is not None:
1484
      self.hv_list = self.op.enabled_hypervisors
1485
    else:
1486
      self.hv_list = cluster.enabled_hypervisors
1487

    
1488
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1489
      # either the enabled list has changed, or the parameters have, validate
1490
      for hv_name, hv_params in self.new_hvparams.items():
1491
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1492
            (self.op.enabled_hypervisors and
1493
             hv_name in self.op.enabled_hypervisors)):
1494
          # either this is a new hypervisor, or its parameters have changed
1495
          hv_class = hypervisor.GetHypervisor(hv_name)
1496
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1497
          hv_class.CheckParameterSyntax(hv_params)
1498
          _CheckHVParams(self, node_list, hv_name, hv_params)
1499

    
1500
  def Exec(self, feedback_fn):
1501
    """Change the parameters of the cluster.
1502

1503
    """
1504
    if self.op.vg_name is not None:
1505
      if self.op.vg_name != self.cfg.GetVGName():
1506
        self.cfg.SetVGName(self.op.vg_name)
1507
      else:
1508
        feedback_fn("Cluster LVM configuration already in desired"
1509
                    " state, not changing")
1510
    if self.op.hvparams:
1511
      self.cluster.hvparams = self.new_hvparams
1512
    if self.op.enabled_hypervisors is not None:
1513
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1514
    if self.op.beparams:
1515
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1516
    if self.op.candidate_pool_size is not None:
1517
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1518

    
1519
    self.cfg.Update(self.cluster)
1520

    
1521
    # we want to update nodes after the cluster so that if any errors
1522
    # happen, we have recorded and saved the cluster info
1523
    if self.op.candidate_pool_size is not None:
1524
      _AdjustCandidatePool(self)
1525

    
1526

    
1527
class LURedistributeConfig(NoHooksLU):
1528
  """Force the redistribution of cluster configuration.
1529

1530
  This is a very simple LU.
1531

1532
  """
1533
  _OP_REQP = []
1534
  REQ_BGL = False
1535

    
1536
  def ExpandNames(self):
1537
    self.needed_locks = {
1538
      locking.LEVEL_NODE: locking.ALL_SET,
1539
    }
1540
    self.share_locks[locking.LEVEL_NODE] = 1
1541

    
1542
  def CheckPrereq(self):
1543
    """Check prerequisites.
1544

1545
    """
1546

    
1547
  def Exec(self, feedback_fn):
1548
    """Redistribute the configuration.
1549

1550
    """
1551
    self.cfg.Update(self.cfg.GetClusterInfo())
1552

    
1553

    
1554
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1555
  """Sleep and poll for an instance's disk to sync.
1556

1557
  """
1558
  if not instance.disks:
1559
    return True
1560

    
1561
  if not oneshot:
1562
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1563

    
1564
  node = instance.primary_node
1565

    
1566
  for dev in instance.disks:
1567
    lu.cfg.SetDiskID(dev, node)
1568

    
1569
  retries = 0
1570
  while True:
1571
    max_time = 0
1572
    done = True
1573
    cumul_degraded = False
1574
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1575
    if rstats.failed or not rstats.data:
1576
      lu.LogWarning("Can't get any data from node %s", node)
1577
      retries += 1
1578
      if retries >= 10:
1579
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1580
                                 " aborting." % node)
1581
      time.sleep(6)
1582
      continue
1583
    rstats = rstats.data
1584
    retries = 0
1585
    for i, mstat in enumerate(rstats):
1586
      if mstat is None:
1587
        lu.LogWarning("Can't compute data for node %s/%s",
1588
                           node, instance.disks[i].iv_name)
1589
        continue
1590
      # we ignore the ldisk parameter
1591
      perc_done, est_time, is_degraded, _ = mstat
1592
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1593
      if perc_done is not None:
1594
        done = False
1595
        if est_time is not None:
1596
          rem_time = "%d estimated seconds remaining" % est_time
1597
          max_time = est_time
1598
        else:
1599
          rem_time = "no time estimate"
1600
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1601
                        (instance.disks[i].iv_name, perc_done, rem_time))
1602
    if done or oneshot:
1603
      break
1604

    
1605
    time.sleep(min(60, max_time))
1606

    
1607
  if done:
1608
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1609
  return not cumul_degraded
1610

    
1611

    
1612
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1613
  """Check that mirrors are not degraded.
1614

1615
  The ldisk parameter, if True, will change the test from the
1616
  is_degraded attribute (which represents overall non-ok status for
1617
  the device(s)) to the ldisk (representing the local storage status).
1618

1619
  """
1620
  lu.cfg.SetDiskID(dev, node)
1621
  if ldisk:
1622
    idx = 6
1623
  else:
1624
    idx = 5
1625

    
1626
  result = True
1627
  if on_primary or dev.AssembleOnSecondary():
1628
    rstats = lu.rpc.call_blockdev_find(node, dev)
1629
    msg = rstats.RemoteFailMsg()
1630
    if msg:
1631
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1632
      result = False
1633
    elif not rstats.payload:
1634
      lu.LogWarning("Can't find disk on node %s", node)
1635
      result = False
1636
    else:
1637
      result = result and (not rstats.payload[idx])
1638
  if dev.children:
1639
    for child in dev.children:
1640
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1641

    
1642
  return result
1643

    
1644

    
1645
class LUDiagnoseOS(NoHooksLU):
1646
  """Logical unit for OS diagnose/query.
1647

1648
  """
1649
  _OP_REQP = ["output_fields", "names"]
1650
  REQ_BGL = False
1651
  _FIELDS_STATIC = utils.FieldSet()
1652
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1653

    
1654
  def ExpandNames(self):
1655
    if self.op.names:
1656
      raise errors.OpPrereqError("Selective OS query not supported")
1657

    
1658
    _CheckOutputFields(static=self._FIELDS_STATIC,
1659
                       dynamic=self._FIELDS_DYNAMIC,
1660
                       selected=self.op.output_fields)
1661

    
1662
    # Lock all nodes, in shared mode
1663
    self.needed_locks = {}
1664
    self.share_locks[locking.LEVEL_NODE] = 1
1665
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1666

    
1667
  def CheckPrereq(self):
1668
    """Check prerequisites.
1669

1670
    """
1671

    
1672
  @staticmethod
1673
  def _DiagnoseByOS(node_list, rlist):
1674
    """Remaps a per-node return list into an a per-os per-node dictionary
1675

1676
    @param node_list: a list with the names of all nodes
1677
    @param rlist: a map with node names as keys and OS objects as values
1678

1679
    @rtype: dict
1680
    @return: a dictionary with osnames as keys and as value another map, with
1681
        nodes as keys and list of OS objects as values, eg::
1682

1683
          {"debian-etch": {"node1": [<object>,...],
1684
                           "node2": [<object>,]}
1685
          }
1686

1687
    """
1688
    all_os = {}
1689
    for node_name, nr in rlist.iteritems():
1690
      if nr.failed or not nr.data:
1691
        continue
1692
      for os_obj in nr.data:
1693
        if os_obj.name not in all_os:
1694
          # build a list of nodes for this os containing empty lists
1695
          # for each node in node_list
1696
          all_os[os_obj.name] = {}
1697
          for nname in node_list:
1698
            all_os[os_obj.name][nname] = []
1699
        all_os[os_obj.name][node_name].append(os_obj)
1700
    return all_os
1701

    
1702
  def Exec(self, feedback_fn):
1703
    """Compute the list of OSes.
1704

1705
    """
1706
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1707
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()
1708
                   if node in node_list]
1709
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1710
    if node_data == False:
1711
      raise errors.OpExecError("Can't gather the list of OSes")
1712
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1713
    output = []
1714
    for os_name, os_data in pol.iteritems():
1715
      row = []
1716
      for field in self.op.output_fields:
1717
        if field == "name":
1718
          val = os_name
1719
        elif field == "valid":
1720
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1721
        elif field == "node_status":
1722
          val = {}
1723
          for node_name, nos_list in os_data.iteritems():
1724
            val[node_name] = [(v.status, v.path) for v in nos_list]
1725
        else:
1726
          raise errors.ParameterError(field)
1727
        row.append(val)
1728
      output.append(row)
1729

    
1730
    return output
1731

    
1732

    
1733
class LURemoveNode(LogicalUnit):
1734
  """Logical unit for removing a node.
1735

1736
  """
1737
  HPATH = "node-remove"
1738
  HTYPE = constants.HTYPE_NODE
1739
  _OP_REQP = ["node_name"]
1740

    
1741
  def BuildHooksEnv(self):
1742
    """Build hooks env.
1743

1744
    This doesn't run on the target node in the pre phase as a failed
1745
    node would then be impossible to remove.
1746

1747
    """
1748
    env = {
1749
      "OP_TARGET": self.op.node_name,
1750
      "NODE_NAME": self.op.node_name,
1751
      }
1752
    all_nodes = self.cfg.GetNodeList()
1753
    all_nodes.remove(self.op.node_name)
1754
    return env, all_nodes, all_nodes
1755

    
1756
  def CheckPrereq(self):
1757
    """Check prerequisites.
1758

1759
    This checks:
1760
     - the node exists in the configuration
1761
     - it does not have primary or secondary instances
1762
     - it's not the master
1763

1764
    Any errors are signalled by raising errors.OpPrereqError.
1765

1766
    """
1767
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1768
    if node is None:
1769
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1770

    
1771
    instance_list = self.cfg.GetInstanceList()
1772

    
1773
    masternode = self.cfg.GetMasterNode()
1774
    if node.name == masternode:
1775
      raise errors.OpPrereqError("Node is the master node,"
1776
                                 " you need to failover first.")
1777

    
1778
    for instance_name in instance_list:
1779
      instance = self.cfg.GetInstanceInfo(instance_name)
1780
      if node.name in instance.all_nodes:
1781
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1782
                                   " please remove first." % instance_name)
1783
    self.op.node_name = node.name
1784
    self.node = node
1785

    
1786
  def Exec(self, feedback_fn):
1787
    """Removes the node from the cluster.
1788

1789
    """
1790
    node = self.node
1791
    logging.info("Stopping the node daemon and removing configs from node %s",
1792
                 node.name)
1793

    
1794
    self.context.RemoveNode(node.name)
1795

    
1796
    self.rpc.call_node_leave_cluster(node.name)
1797

    
1798
    # Promote nodes to master candidate as needed
1799
    _AdjustCandidatePool(self)
1800

    
1801

    
1802
class LUQueryNodes(NoHooksLU):
1803
  """Logical unit for querying nodes.
1804

1805
  """
1806
  _OP_REQP = ["output_fields", "names", "use_locking"]
1807
  REQ_BGL = False
1808
  _FIELDS_DYNAMIC = utils.FieldSet(
1809
    "dtotal", "dfree",
1810
    "mtotal", "mnode", "mfree",
1811
    "bootid",
1812
    "ctotal", "cnodes", "csockets",
1813
    )
1814

    
1815
  _FIELDS_STATIC = utils.FieldSet(
1816
    "name", "pinst_cnt", "sinst_cnt",
1817
    "pinst_list", "sinst_list",
1818
    "pip", "sip", "tags",
1819
    "serial_no",
1820
    "master_candidate",
1821
    "master",
1822
    "offline",
1823
    "drained",
1824
    )
1825

    
1826
  def ExpandNames(self):
1827
    _CheckOutputFields(static=self._FIELDS_STATIC,
1828
                       dynamic=self._FIELDS_DYNAMIC,
1829
                       selected=self.op.output_fields)
1830

    
1831
    self.needed_locks = {}
1832
    self.share_locks[locking.LEVEL_NODE] = 1
1833

    
1834
    if self.op.names:
1835
      self.wanted = _GetWantedNodes(self, self.op.names)
1836
    else:
1837
      self.wanted = locking.ALL_SET
1838

    
1839
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1840
    self.do_locking = self.do_node_query and self.op.use_locking
1841
    if self.do_locking:
1842
      # if we don't request only static fields, we need to lock the nodes
1843
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1844

    
1845

    
1846
  def CheckPrereq(self):
1847
    """Check prerequisites.
1848

1849
    """
1850
    # The validation of the node list is done in the _GetWantedNodes,
1851
    # if non empty, and if empty, there's no validation to do
1852
    pass
1853

    
1854
  def Exec(self, feedback_fn):
1855
    """Computes the list of nodes and their attributes.
1856

1857
    """
1858
    all_info = self.cfg.GetAllNodesInfo()
1859
    if self.do_locking:
1860
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1861
    elif self.wanted != locking.ALL_SET:
1862
      nodenames = self.wanted
1863
      missing = set(nodenames).difference(all_info.keys())
1864
      if missing:
1865
        raise errors.OpExecError(
1866
          "Some nodes were removed before retrieving their data: %s" % missing)
1867
    else:
1868
      nodenames = all_info.keys()
1869

    
1870
    nodenames = utils.NiceSort(nodenames)
1871
    nodelist = [all_info[name] for name in nodenames]
1872

    
1873
    # begin data gathering
1874

    
1875
    if self.do_node_query:
1876
      live_data = {}
1877
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1878
                                          self.cfg.GetHypervisorType())
1879
      for name in nodenames:
1880
        nodeinfo = node_data[name]
1881
        if not nodeinfo.failed and nodeinfo.data:
1882
          nodeinfo = nodeinfo.data
1883
          fn = utils.TryConvert
1884
          live_data[name] = {
1885
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1886
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1887
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1888
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1889
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1890
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1891
            "bootid": nodeinfo.get('bootid', None),
1892
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1893
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1894
            }
1895
        else:
1896
          live_data[name] = {}
1897
    else:
1898
      live_data = dict.fromkeys(nodenames, {})
1899

    
1900
    node_to_primary = dict([(name, set()) for name in nodenames])
1901
    node_to_secondary = dict([(name, set()) for name in nodenames])
1902

    
1903
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1904
                             "sinst_cnt", "sinst_list"))
1905
    if inst_fields & frozenset(self.op.output_fields):
1906
      instancelist = self.cfg.GetInstanceList()
1907

    
1908
      for instance_name in instancelist:
1909
        inst = self.cfg.GetInstanceInfo(instance_name)
1910
        if inst.primary_node in node_to_primary:
1911
          node_to_primary[inst.primary_node].add(inst.name)
1912
        for secnode in inst.secondary_nodes:
1913
          if secnode in node_to_secondary:
1914
            node_to_secondary[secnode].add(inst.name)
1915

    
1916
    master_node = self.cfg.GetMasterNode()
1917

    
1918
    # end data gathering
1919

    
1920
    output = []
1921
    for node in nodelist:
1922
      node_output = []
1923
      for field in self.op.output_fields:
1924
        if field == "name":
1925
          val = node.name
1926
        elif field == "pinst_list":
1927
          val = list(node_to_primary[node.name])
1928
        elif field == "sinst_list":
1929
          val = list(node_to_secondary[node.name])
1930
        elif field == "pinst_cnt":
1931
          val = len(node_to_primary[node.name])
1932
        elif field == "sinst_cnt":
1933
          val = len(node_to_secondary[node.name])
1934
        elif field == "pip":
1935
          val = node.primary_ip
1936
        elif field == "sip":
1937
          val = node.secondary_ip
1938
        elif field == "tags":
1939
          val = list(node.GetTags())
1940
        elif field == "serial_no":
1941
          val = node.serial_no
1942
        elif field == "master_candidate":
1943
          val = node.master_candidate
1944
        elif field == "master":
1945
          val = node.name == master_node
1946
        elif field == "offline":
1947
          val = node.offline
1948
        elif field == "drained":
1949
          val = node.drained
1950
        elif self._FIELDS_DYNAMIC.Matches(field):
1951
          val = live_data[node.name].get(field, None)
1952
        else:
1953
          raise errors.ParameterError(field)
1954
        node_output.append(val)
1955
      output.append(node_output)
1956

    
1957
    return output
1958

    
1959

    
1960
class LUQueryNodeVolumes(NoHooksLU):
1961
  """Logical unit for getting volumes on node(s).
1962

1963
  """
1964
  _OP_REQP = ["nodes", "output_fields"]
1965
  REQ_BGL = False
1966
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1967
  _FIELDS_STATIC = utils.FieldSet("node")
1968

    
1969
  def ExpandNames(self):
1970
    _CheckOutputFields(static=self._FIELDS_STATIC,
1971
                       dynamic=self._FIELDS_DYNAMIC,
1972
                       selected=self.op.output_fields)
1973

    
1974
    self.needed_locks = {}
1975
    self.share_locks[locking.LEVEL_NODE] = 1
1976
    if not self.op.nodes:
1977
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1978
    else:
1979
      self.needed_locks[locking.LEVEL_NODE] = \
1980
        _GetWantedNodes(self, self.op.nodes)
1981

    
1982
  def CheckPrereq(self):
1983
    """Check prerequisites.
1984

1985
    This checks that the fields required are valid output fields.
1986

1987
    """
1988
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1989

    
1990
  def Exec(self, feedback_fn):
1991
    """Computes the list of nodes and their attributes.
1992

1993
    """
1994
    nodenames = self.nodes
1995
    volumes = self.rpc.call_node_volumes(nodenames)
1996

    
1997
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
1998
             in self.cfg.GetInstanceList()]
1999

    
2000
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2001

    
2002
    output = []
2003
    for node in nodenames:
2004
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2005
        continue
2006

    
2007
      node_vols = volumes[node].data[:]
2008
      node_vols.sort(key=lambda vol: vol['dev'])
2009

    
2010
      for vol in node_vols:
2011
        node_output = []
2012
        for field in self.op.output_fields:
2013
          if field == "node":
2014
            val = node
2015
          elif field == "phys":
2016
            val = vol['dev']
2017
          elif field == "vg":
2018
            val = vol['vg']
2019
          elif field == "name":
2020
            val = vol['name']
2021
          elif field == "size":
2022
            val = int(float(vol['size']))
2023
          elif field == "instance":
2024
            for inst in ilist:
2025
              if node not in lv_by_node[inst]:
2026
                continue
2027
              if vol['name'] in lv_by_node[inst][node]:
2028
                val = inst.name
2029
                break
2030
            else:
2031
              val = '-'
2032
          else:
2033
            raise errors.ParameterError(field)
2034
          node_output.append(str(val))
2035

    
2036
        output.append(node_output)
2037

    
2038
    return output
2039

    
2040

    
2041
class LUAddNode(LogicalUnit):
2042
  """Logical unit for adding node to the cluster.
2043

2044
  """
2045
  HPATH = "node-add"
2046
  HTYPE = constants.HTYPE_NODE
2047
  _OP_REQP = ["node_name"]
2048

    
2049
  def BuildHooksEnv(self):
2050
    """Build hooks env.
2051

2052
    This will run on all nodes before, and on all nodes + the new node after.
2053

2054
    """
2055
    env = {
2056
      "OP_TARGET": self.op.node_name,
2057
      "NODE_NAME": self.op.node_name,
2058
      "NODE_PIP": self.op.primary_ip,
2059
      "NODE_SIP": self.op.secondary_ip,
2060
      }
2061
    nodes_0 = self.cfg.GetNodeList()
2062
    nodes_1 = nodes_0 + [self.op.node_name, ]
2063
    return env, nodes_0, nodes_1
2064

    
2065
  def CheckPrereq(self):
2066
    """Check prerequisites.
2067

2068
    This checks:
2069
     - the new node is not already in the config
2070
     - it is resolvable
2071
     - its parameters (single/dual homed) matches the cluster
2072

2073
    Any errors are signalled by raising errors.OpPrereqError.
2074

2075
    """
2076
    node_name = self.op.node_name
2077
    cfg = self.cfg
2078

    
2079
    dns_data = utils.HostInfo(node_name)
2080

    
2081
    node = dns_data.name
2082
    primary_ip = self.op.primary_ip = dns_data.ip
2083
    secondary_ip = getattr(self.op, "secondary_ip", None)
2084
    if secondary_ip is None:
2085
      secondary_ip = primary_ip
2086
    if not utils.IsValidIP(secondary_ip):
2087
      raise errors.OpPrereqError("Invalid secondary IP given")
2088
    self.op.secondary_ip = secondary_ip
2089

    
2090
    node_list = cfg.GetNodeList()
2091
    if not self.op.readd and node in node_list:
2092
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2093
                                 node)
2094
    elif self.op.readd and node not in node_list:
2095
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2096

    
2097
    for existing_node_name in node_list:
2098
      existing_node = cfg.GetNodeInfo(existing_node_name)
2099

    
2100
      if self.op.readd and node == existing_node_name:
2101
        if (existing_node.primary_ip != primary_ip or
2102
            existing_node.secondary_ip != secondary_ip):
2103
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2104
                                     " address configuration as before")
2105
        continue
2106

    
2107
      if (existing_node.primary_ip == primary_ip or
2108
          existing_node.secondary_ip == primary_ip or
2109
          existing_node.primary_ip == secondary_ip or
2110
          existing_node.secondary_ip == secondary_ip):
2111
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2112
                                   " existing node %s" % existing_node.name)
2113

    
2114
    # check that the type of the node (single versus dual homed) is the
2115
    # same as for the master
2116
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2117
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2118
    newbie_singlehomed = secondary_ip == primary_ip
2119
    if master_singlehomed != newbie_singlehomed:
2120
      if master_singlehomed:
2121
        raise errors.OpPrereqError("The master has no private ip but the"
2122
                                   " new node has one")
2123
      else:
2124
        raise errors.OpPrereqError("The master has a private ip but the"
2125
                                   " new node doesn't have one")
2126

    
2127
    # checks reachablity
2128
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2129
      raise errors.OpPrereqError("Node not reachable by ping")
2130

    
2131
    if not newbie_singlehomed:
2132
      # check reachability from my secondary ip to newbie's secondary ip
2133
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2134
                           source=myself.secondary_ip):
2135
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2136
                                   " based ping to noded port")
2137

    
2138
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2139
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2140
    master_candidate = mc_now < cp_size
2141

    
2142
    self.new_node = objects.Node(name=node,
2143
                                 primary_ip=primary_ip,
2144
                                 secondary_ip=secondary_ip,
2145
                                 master_candidate=master_candidate,
2146
                                 offline=False, drained=False)
2147

    
2148
  def Exec(self, feedback_fn):
2149
    """Adds the new node to the cluster.
2150

2151
    """
2152
    new_node = self.new_node
2153
    node = new_node.name
2154

    
2155
    # check connectivity
2156
    result = self.rpc.call_version([node])[node]
2157
    result.Raise()
2158
    if result.data:
2159
      if constants.PROTOCOL_VERSION == result.data:
2160
        logging.info("Communication to node %s fine, sw version %s match",
2161
                     node, result.data)
2162
      else:
2163
        raise errors.OpExecError("Version mismatch master version %s,"
2164
                                 " node version %s" %
2165
                                 (constants.PROTOCOL_VERSION, result.data))
2166
    else:
2167
      raise errors.OpExecError("Cannot get version from the new node")
2168

    
2169
    # setup ssh on node
2170
    logging.info("Copy ssh key to node %s", node)
2171
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2172
    keyarray = []
2173
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2174
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2175
                priv_key, pub_key]
2176

    
2177
    for i in keyfiles:
2178
      f = open(i, 'r')
2179
      try:
2180
        keyarray.append(f.read())
2181
      finally:
2182
        f.close()
2183

    
2184
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2185
                                    keyarray[2],
2186
                                    keyarray[3], keyarray[4], keyarray[5])
2187

    
2188
    msg = result.RemoteFailMsg()
2189
    if msg:
2190
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2191
                               " new node: %s" % msg)
2192

    
2193
    # Add node to our /etc/hosts, and add key to known_hosts
2194
    utils.AddHostToEtcHosts(new_node.name)
2195

    
2196
    if new_node.secondary_ip != new_node.primary_ip:
2197
      result = self.rpc.call_node_has_ip_address(new_node.name,
2198
                                                 new_node.secondary_ip)
2199
      if result.failed or not result.data:
2200
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2201
                                 " you gave (%s). Please fix and re-run this"
2202
                                 " command." % new_node.secondary_ip)
2203

    
2204
    node_verify_list = [self.cfg.GetMasterNode()]
2205
    node_verify_param = {
2206
      'nodelist': [node],
2207
      # TODO: do a node-net-test as well?
2208
    }
2209

    
2210
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2211
                                       self.cfg.GetClusterName())
2212
    for verifier in node_verify_list:
2213
      if result[verifier].failed or not result[verifier].data:
2214
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2215
                                 " for remote verification" % verifier)
2216
      if result[verifier].data['nodelist']:
2217
        for failed in result[verifier].data['nodelist']:
2218
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2219
                      (verifier, result[verifier].data['nodelist'][failed]))
2220
        raise errors.OpExecError("ssh/hostname verification failed.")
2221

    
2222
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2223
    # including the node just added
2224
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2225
    dist_nodes = self.cfg.GetNodeList()
2226
    if not self.op.readd:
2227
      dist_nodes.append(node)
2228
    if myself.name in dist_nodes:
2229
      dist_nodes.remove(myself.name)
2230

    
2231
    logging.debug("Copying hosts and known_hosts to all nodes")
2232
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2233
      result = self.rpc.call_upload_file(dist_nodes, fname)
2234
      for to_node, to_result in result.iteritems():
2235
        if to_result.failed or not to_result.data:
2236
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2237

    
2238
    to_copy = []
2239
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2240
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2241
      to_copy.append(constants.VNC_PASSWORD_FILE)
2242

    
2243
    for fname in to_copy:
2244
      result = self.rpc.call_upload_file([node], fname)
2245
      if result[node].failed or not result[node]:
2246
        logging.error("Could not copy file %s to node %s", fname, node)
2247

    
2248
    if self.op.readd:
2249
      self.context.ReaddNode(new_node)
2250
    else:
2251
      self.context.AddNode(new_node)
2252

    
2253

    
2254
class LUSetNodeParams(LogicalUnit):
2255
  """Modifies the parameters of a node.
2256

2257
  """
2258
  HPATH = "node-modify"
2259
  HTYPE = constants.HTYPE_NODE
2260
  _OP_REQP = ["node_name"]
2261
  REQ_BGL = False
2262

    
2263
  def CheckArguments(self):
2264
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2265
    if node_name is None:
2266
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2267
    self.op.node_name = node_name
2268
    _CheckBooleanOpField(self.op, 'master_candidate')
2269
    _CheckBooleanOpField(self.op, 'offline')
2270
    _CheckBooleanOpField(self.op, 'drained')
2271
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2272
    if all_mods.count(None) == 3:
2273
      raise errors.OpPrereqError("Please pass at least one modification")
2274
    if all_mods.count(True) > 1:
2275
      raise errors.OpPrereqError("Can't set the node into more than one"
2276
                                 " state at the same time")
2277

    
2278
  def ExpandNames(self):
2279
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2280

    
2281
  def BuildHooksEnv(self):
2282
    """Build hooks env.
2283

2284
    This runs on the master node.
2285

2286
    """
2287
    env = {
2288
      "OP_TARGET": self.op.node_name,
2289
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2290
      "OFFLINE": str(self.op.offline),
2291
      "DRAINED": str(self.op.drained),
2292
      }
2293
    nl = [self.cfg.GetMasterNode(),
2294
          self.op.node_name]
2295
    return env, nl, nl
2296

    
2297
  def CheckPrereq(self):
2298
    """Check prerequisites.
2299

2300
    This only checks the instance list against the existing names.
2301

2302
    """
2303
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2304

    
2305
    if ((self.op.master_candidate == False or self.op.offline == True or
2306
         self.op.drained == True) and node.master_candidate):
2307
      # we will demote the node from master_candidate
2308
      if self.op.node_name == self.cfg.GetMasterNode():
2309
        raise errors.OpPrereqError("The master node has to be a"
2310
                                   " master candidate, online and not drained")
2311
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2312
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2313
      if num_candidates <= cp_size:
2314
        msg = ("Not enough master candidates (desired"
2315
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2316
        if self.op.force:
2317
          self.LogWarning(msg)
2318
        else:
2319
          raise errors.OpPrereqError(msg)
2320

    
2321
    if (self.op.master_candidate == True and
2322
        ((node.offline and not self.op.offline == False) or
2323
         (node.drained and not self.op.drained == False))):
2324
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2325
                                 " to master_candidate")
2326

    
2327
    return
2328

    
2329
  def Exec(self, feedback_fn):
2330
    """Modifies a node.
2331

2332
    """
2333
    node = self.node
2334

    
2335
    result = []
2336
    changed_mc = False
2337

    
2338
    if self.op.offline is not None:
2339
      node.offline = self.op.offline
2340
      result.append(("offline", str(self.op.offline)))
2341
      if self.op.offline == True:
2342
        if node.master_candidate:
2343
          node.master_candidate = False
2344
          changed_mc = True
2345
          result.append(("master_candidate", "auto-demotion due to offline"))
2346
        if node.drained:
2347
          node.drained = False
2348
          result.append(("drained", "clear drained status due to offline"))
2349

    
2350
    if self.op.master_candidate is not None:
2351
      node.master_candidate = self.op.master_candidate
2352
      changed_mc = True
2353
      result.append(("master_candidate", str(self.op.master_candidate)))
2354
      if self.op.master_candidate == False:
2355
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2356
        msg = rrc.RemoteFailMsg()
2357
        if msg:
2358
          self.LogWarning("Node failed to demote itself: %s" % msg)
2359

    
2360
    if self.op.drained is not None:
2361
      node.drained = self.op.drained
2362
      result.append(("drained", str(self.op.drained)))
2363
      if self.op.drained == True:
2364
        if node.master_candidate:
2365
          node.master_candidate = False
2366
          changed_mc = True
2367
          result.append(("master_candidate", "auto-demotion due to drain"))
2368
        if node.offline:
2369
          node.offline = False
2370
          result.append(("offline", "clear offline status due to drain"))
2371

    
2372
    # this will trigger configuration file update, if needed
2373
    self.cfg.Update(node)
2374
    # this will trigger job queue propagation or cleanup
2375
    if changed_mc:
2376
      self.context.ReaddNode(node)
2377

    
2378
    return result
2379

    
2380

    
2381
class LUQueryClusterInfo(NoHooksLU):
2382
  """Query cluster configuration.
2383

2384
  """
2385
  _OP_REQP = []
2386
  REQ_BGL = False
2387

    
2388
  def ExpandNames(self):
2389
    self.needed_locks = {}
2390

    
2391
  def CheckPrereq(self):
2392
    """No prerequsites needed for this LU.
2393

2394
    """
2395
    pass
2396

    
2397
  def Exec(self, feedback_fn):
2398
    """Return cluster config.
2399

2400
    """
2401
    cluster = self.cfg.GetClusterInfo()
2402
    result = {
2403
      "software_version": constants.RELEASE_VERSION,
2404
      "protocol_version": constants.PROTOCOL_VERSION,
2405
      "config_version": constants.CONFIG_VERSION,
2406
      "os_api_version": constants.OS_API_VERSION,
2407
      "export_version": constants.EXPORT_VERSION,
2408
      "architecture": (platform.architecture()[0], platform.machine()),
2409
      "name": cluster.cluster_name,
2410
      "master": cluster.master_node,
2411
      "default_hypervisor": cluster.default_hypervisor,
2412
      "enabled_hypervisors": cluster.enabled_hypervisors,
2413
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2414
                        for hypervisor in cluster.enabled_hypervisors]),
2415
      "beparams": cluster.beparams,
2416
      "candidate_pool_size": cluster.candidate_pool_size,
2417
      }
2418

    
2419
    return result
2420

    
2421

    
2422
class LUQueryConfigValues(NoHooksLU):
2423
  """Return configuration values.
2424

2425
  """
2426
  _OP_REQP = []
2427
  REQ_BGL = False
2428
  _FIELDS_DYNAMIC = utils.FieldSet()
2429
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2430

    
2431
  def ExpandNames(self):
2432
    self.needed_locks = {}
2433

    
2434
    _CheckOutputFields(static=self._FIELDS_STATIC,
2435
                       dynamic=self._FIELDS_DYNAMIC,
2436
                       selected=self.op.output_fields)
2437

    
2438
  def CheckPrereq(self):
2439
    """No prerequisites.
2440

2441
    """
2442
    pass
2443

    
2444
  def Exec(self, feedback_fn):
2445
    """Dump a representation of the cluster config to the standard output.
2446

2447
    """
2448
    values = []
2449
    for field in self.op.output_fields:
2450
      if field == "cluster_name":
2451
        entry = self.cfg.GetClusterName()
2452
      elif field == "master_node":
2453
        entry = self.cfg.GetMasterNode()
2454
      elif field == "drain_flag":
2455
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2456
      else:
2457
        raise errors.ParameterError(field)
2458
      values.append(entry)
2459
    return values
2460

    
2461

    
2462
class LUActivateInstanceDisks(NoHooksLU):
2463
  """Bring up an instance's disks.
2464

2465
  """
2466
  _OP_REQP = ["instance_name"]
2467
  REQ_BGL = False
2468

    
2469
  def ExpandNames(self):
2470
    self._ExpandAndLockInstance()
2471
    self.needed_locks[locking.LEVEL_NODE] = []
2472
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2473

    
2474
  def DeclareLocks(self, level):
2475
    if level == locking.LEVEL_NODE:
2476
      self._LockInstancesNodes()
2477

    
2478
  def CheckPrereq(self):
2479
    """Check prerequisites.
2480

2481
    This checks that the instance is in the cluster.
2482

2483
    """
2484
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2485
    assert self.instance is not None, \
2486
      "Cannot retrieve locked instance %s" % self.op.instance_name
2487
    _CheckNodeOnline(self, self.instance.primary_node)
2488

    
2489
  def Exec(self, feedback_fn):
2490
    """Activate the disks.
2491

2492
    """
2493
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2494
    if not disks_ok:
2495
      raise errors.OpExecError("Cannot activate block devices")
2496

    
2497
    return disks_info
2498

    
2499

    
2500
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2501
  """Prepare the block devices for an instance.
2502

2503
  This sets up the block devices on all nodes.
2504

2505
  @type lu: L{LogicalUnit}
2506
  @param lu: the logical unit on whose behalf we execute
2507
  @type instance: L{objects.Instance}
2508
  @param instance: the instance for whose disks we assemble
2509
  @type ignore_secondaries: boolean
2510
  @param ignore_secondaries: if true, errors on secondary nodes
2511
      won't result in an error return from the function
2512
  @return: False if the operation failed, otherwise a list of
2513
      (host, instance_visible_name, node_visible_name)
2514
      with the mapping from node devices to instance devices
2515

2516
  """
2517
  device_info = []
2518
  disks_ok = True
2519
  iname = instance.name
2520
  # With the two passes mechanism we try to reduce the window of
2521
  # opportunity for the race condition of switching DRBD to primary
2522
  # before handshaking occured, but we do not eliminate it
2523

    
2524
  # The proper fix would be to wait (with some limits) until the
2525
  # connection has been made and drbd transitions from WFConnection
2526
  # into any other network-connected state (Connected, SyncTarget,
2527
  # SyncSource, etc.)
2528

    
2529
  # 1st pass, assemble on all nodes in secondary mode
2530
  for inst_disk in instance.disks:
2531
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2532
      lu.cfg.SetDiskID(node_disk, node)
2533
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2534
      msg = result.RemoteFailMsg()
2535
      if msg:
2536
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2537
                           " (is_primary=False, pass=1): %s",
2538
                           inst_disk.iv_name, node, msg)
2539
        if not ignore_secondaries:
2540
          disks_ok = False
2541

    
2542
  # FIXME: race condition on drbd migration to primary
2543

    
2544
  # 2nd pass, do only the primary node
2545
  for inst_disk in instance.disks:
2546
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2547
      if node != instance.primary_node:
2548
        continue
2549
      lu.cfg.SetDiskID(node_disk, node)
2550
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2551
      msg = result.RemoteFailMsg()
2552
      if msg:
2553
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2554
                           " (is_primary=True, pass=2): %s",
2555
                           inst_disk.iv_name, node, msg)
2556
        disks_ok = False
2557
    device_info.append((instance.primary_node, inst_disk.iv_name,
2558
                        result.payload))
2559

    
2560
  # leave the disks configured for the primary node
2561
  # this is a workaround that would be fixed better by
2562
  # improving the logical/physical id handling
2563
  for disk in instance.disks:
2564
    lu.cfg.SetDiskID(disk, instance.primary_node)
2565

    
2566
  return disks_ok, device_info
2567

    
2568

    
2569
def _StartInstanceDisks(lu, instance, force):
2570
  """Start the disks of an instance.
2571

2572
  """
2573
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2574
                                           ignore_secondaries=force)
2575
  if not disks_ok:
2576
    _ShutdownInstanceDisks(lu, instance)
2577
    if force is not None and not force:
2578
      lu.proc.LogWarning("", hint="If the message above refers to a"
2579
                         " secondary node,"
2580
                         " you can retry the operation using '--force'.")
2581
    raise errors.OpExecError("Disk consistency error")
2582

    
2583

    
2584
class LUDeactivateInstanceDisks(NoHooksLU):
2585
  """Shutdown an instance's disks.
2586

2587
  """
2588
  _OP_REQP = ["instance_name"]
2589
  REQ_BGL = False
2590

    
2591
  def ExpandNames(self):
2592
    self._ExpandAndLockInstance()
2593
    self.needed_locks[locking.LEVEL_NODE] = []
2594
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2595

    
2596
  def DeclareLocks(self, level):
2597
    if level == locking.LEVEL_NODE:
2598
      self._LockInstancesNodes()
2599

    
2600
  def CheckPrereq(self):
2601
    """Check prerequisites.
2602

2603
    This checks that the instance is in the cluster.
2604

2605
    """
2606
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2607
    assert self.instance is not None, \
2608
      "Cannot retrieve locked instance %s" % self.op.instance_name
2609

    
2610
  def Exec(self, feedback_fn):
2611
    """Deactivate the disks
2612

2613
    """
2614
    instance = self.instance
2615
    _SafeShutdownInstanceDisks(self, instance)
2616

    
2617

    
2618
def _SafeShutdownInstanceDisks(lu, instance):
2619
  """Shutdown block devices of an instance.
2620

2621
  This function checks if an instance is running, before calling
2622
  _ShutdownInstanceDisks.
2623

2624
  """
2625
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2626
                                      [instance.hypervisor])
2627
  ins_l = ins_l[instance.primary_node]
2628
  if ins_l.failed or not isinstance(ins_l.data, list):
2629
    raise errors.OpExecError("Can't contact node '%s'" %
2630
                             instance.primary_node)
2631

    
2632
  if instance.name in ins_l.data:
2633
    raise errors.OpExecError("Instance is running, can't shutdown"
2634
                             " block devices.")
2635

    
2636
  _ShutdownInstanceDisks(lu, instance)
2637

    
2638

    
2639
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2640
  """Shutdown block devices of an instance.
2641

2642
  This does the shutdown on all nodes of the instance.
2643

2644
  If the ignore_primary is false, errors on the primary node are
2645
  ignored.
2646

2647
  """
2648
  all_result = True
2649
  for disk in instance.disks:
2650
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2651
      lu.cfg.SetDiskID(top_disk, node)
2652
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2653
      msg = result.RemoteFailMsg()
2654
      if msg:
2655
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2656
                      disk.iv_name, node, msg)
2657
        if not ignore_primary or node != instance.primary_node:
2658
          all_result = False
2659
  return all_result
2660

    
2661

    
2662
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2663
  """Checks if a node has enough free memory.
2664

2665
  This function check if a given node has the needed amount of free
2666
  memory. In case the node has less memory or we cannot get the
2667
  information from the node, this function raise an OpPrereqError
2668
  exception.
2669

2670
  @type lu: C{LogicalUnit}
2671
  @param lu: a logical unit from which we get configuration data
2672
  @type node: C{str}
2673
  @param node: the node to check
2674
  @type reason: C{str}
2675
  @param reason: string to use in the error message
2676
  @type requested: C{int}
2677
  @param requested: the amount of memory in MiB to check for
2678
  @type hypervisor_name: C{str}
2679
  @param hypervisor_name: the hypervisor to ask for memory stats
2680
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2681
      we cannot check the node
2682

2683
  """
2684
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2685
  nodeinfo[node].Raise()
2686
  free_mem = nodeinfo[node].data.get('memory_free')
2687
  if not isinstance(free_mem, int):
2688
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2689
                             " was '%s'" % (node, free_mem))
2690
  if requested > free_mem:
2691
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2692
                             " needed %s MiB, available %s MiB" %
2693
                             (node, reason, requested, free_mem))
2694

    
2695

    
2696
class LUStartupInstance(LogicalUnit):
2697
  """Starts an instance.
2698

2699
  """
2700
  HPATH = "instance-start"
2701
  HTYPE = constants.HTYPE_INSTANCE
2702
  _OP_REQP = ["instance_name", "force"]
2703
  REQ_BGL = False
2704

    
2705
  def ExpandNames(self):
2706
    self._ExpandAndLockInstance()
2707

    
2708
  def BuildHooksEnv(self):
2709
    """Build hooks env.
2710

2711
    This runs on master, primary and secondary nodes of the instance.
2712

2713
    """
2714
    env = {
2715
      "FORCE": self.op.force,
2716
      }
2717
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2718
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2719
    return env, nl, nl
2720

    
2721
  def CheckPrereq(self):
2722
    """Check prerequisites.
2723

2724
    This checks that the instance is in the cluster.
2725

2726
    """
2727
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2728
    assert self.instance is not None, \
2729
      "Cannot retrieve locked instance %s" % self.op.instance_name
2730

    
2731
    _CheckNodeOnline(self, instance.primary_node)
2732

    
2733
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2734
    # check bridges existance
2735
    _CheckInstanceBridgesExist(self, instance)
2736

    
2737
    _CheckNodeFreeMemory(self, instance.primary_node,
2738
                         "starting instance %s" % instance.name,
2739
                         bep[constants.BE_MEMORY], instance.hypervisor)
2740

    
2741
  def Exec(self, feedback_fn):
2742
    """Start the instance.
2743

2744
    """
2745
    instance = self.instance
2746
    force = self.op.force
2747

    
2748
    self.cfg.MarkInstanceUp(instance.name)
2749

    
2750
    node_current = instance.primary_node
2751

    
2752
    _StartInstanceDisks(self, instance, force)
2753

    
2754
    result = self.rpc.call_instance_start(node_current, instance)
2755
    msg = result.RemoteFailMsg()
2756
    if msg:
2757
      _ShutdownInstanceDisks(self, instance)
2758
      raise errors.OpExecError("Could not start instance: %s" % msg)
2759

    
2760

    
2761
class LURebootInstance(LogicalUnit):
2762
  """Reboot an instance.
2763

2764
  """
2765
  HPATH = "instance-reboot"
2766
  HTYPE = constants.HTYPE_INSTANCE
2767
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2768
  REQ_BGL = False
2769

    
2770
  def ExpandNames(self):
2771
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2772
                                   constants.INSTANCE_REBOOT_HARD,
2773
                                   constants.INSTANCE_REBOOT_FULL]:
2774
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2775
                                  (constants.INSTANCE_REBOOT_SOFT,
2776
                                   constants.INSTANCE_REBOOT_HARD,
2777
                                   constants.INSTANCE_REBOOT_FULL))
2778
    self._ExpandAndLockInstance()
2779

    
2780
  def BuildHooksEnv(self):
2781
    """Build hooks env.
2782

2783
    This runs on master, primary and secondary nodes of the instance.
2784

2785
    """
2786
    env = {
2787
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2788
      "REBOOT_TYPE": self.op.reboot_type,
2789
      }
2790
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2791
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2792
    return env, nl, nl
2793

    
2794
  def CheckPrereq(self):
2795
    """Check prerequisites.
2796

2797
    This checks that the instance is in the cluster.
2798

2799
    """
2800
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2801
    assert self.instance is not None, \
2802
      "Cannot retrieve locked instance %s" % self.op.instance_name
2803

    
2804
    _CheckNodeOnline(self, instance.primary_node)
2805

    
2806
    # check bridges existance
2807
    _CheckInstanceBridgesExist(self, instance)
2808

    
2809
  def Exec(self, feedback_fn):
2810
    """Reboot the instance.
2811

2812
    """
2813
    instance = self.instance
2814
    ignore_secondaries = self.op.ignore_secondaries
2815
    reboot_type = self.op.reboot_type
2816

    
2817
    node_current = instance.primary_node
2818

    
2819
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2820
                       constants.INSTANCE_REBOOT_HARD]:
2821
      for disk in instance.disks:
2822
        self.cfg.SetDiskID(disk, node_current)
2823
      result = self.rpc.call_instance_reboot(node_current, instance,
2824
                                             reboot_type)
2825
      msg = result.RemoteFailMsg()
2826
      if msg:
2827
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2828
    else:
2829
      result = self.rpc.call_instance_shutdown(node_current, instance)
2830
      msg = result.RemoteFailMsg()
2831
      if msg:
2832
        raise errors.OpExecError("Could not shutdown instance for"
2833
                                 " full reboot: %s" % msg)
2834
      _ShutdownInstanceDisks(self, instance)
2835
      _StartInstanceDisks(self, instance, ignore_secondaries)
2836
      result = self.rpc.call_instance_start(node_current, instance)
2837
      msg = result.RemoteFailMsg()
2838
      if msg:
2839
        _ShutdownInstanceDisks(self, instance)
2840
        raise errors.OpExecError("Could not start instance for"
2841
                                 " full reboot: %s" % msg)
2842

    
2843
    self.cfg.MarkInstanceUp(instance.name)
2844

    
2845

    
2846
class LUShutdownInstance(LogicalUnit):
2847
  """Shutdown an instance.
2848

2849
  """
2850
  HPATH = "instance-stop"
2851
  HTYPE = constants.HTYPE_INSTANCE
2852
  _OP_REQP = ["instance_name"]
2853
  REQ_BGL = False
2854

    
2855
  def ExpandNames(self):
2856
    self._ExpandAndLockInstance()
2857

    
2858
  def BuildHooksEnv(self):
2859
    """Build hooks env.
2860

2861
    This runs on master, primary and secondary nodes of the instance.
2862

2863
    """
2864
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2865
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2866
    return env, nl, nl
2867

    
2868
  def CheckPrereq(self):
2869
    """Check prerequisites.
2870

2871
    This checks that the instance is in the cluster.
2872

2873
    """
2874
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2875
    assert self.instance is not None, \
2876
      "Cannot retrieve locked instance %s" % self.op.instance_name
2877
    _CheckNodeOnline(self, self.instance.primary_node)
2878

    
2879
  def Exec(self, feedback_fn):
2880
    """Shutdown the instance.
2881

2882
    """
2883
    instance = self.instance
2884
    node_current = instance.primary_node
2885
    self.cfg.MarkInstanceDown(instance.name)
2886
    result = self.rpc.call_instance_shutdown(node_current, instance)
2887
    msg = result.RemoteFailMsg()
2888
    if msg:
2889
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2890

    
2891
    _ShutdownInstanceDisks(self, instance)
2892

    
2893

    
2894
class LUReinstallInstance(LogicalUnit):
2895
  """Reinstall an instance.
2896

2897
  """
2898
  HPATH = "instance-reinstall"
2899
  HTYPE = constants.HTYPE_INSTANCE
2900
  _OP_REQP = ["instance_name"]
2901
  REQ_BGL = False
2902

    
2903
  def ExpandNames(self):
2904
    self._ExpandAndLockInstance()
2905

    
2906
  def BuildHooksEnv(self):
2907
    """Build hooks env.
2908

2909
    This runs on master, primary and secondary nodes of the instance.
2910

2911
    """
2912
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2913
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2914
    return env, nl, nl
2915

    
2916
  def CheckPrereq(self):
2917
    """Check prerequisites.
2918

2919
    This checks that the instance is in the cluster and is not running.
2920

2921
    """
2922
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2923
    assert instance is not None, \
2924
      "Cannot retrieve locked instance %s" % self.op.instance_name
2925
    _CheckNodeOnline(self, instance.primary_node)
2926

    
2927
    if instance.disk_template == constants.DT_DISKLESS:
2928
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2929
                                 self.op.instance_name)
2930
    if instance.admin_up:
2931
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2932
                                 self.op.instance_name)
2933
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2934
                                              instance.name,
2935
                                              instance.hypervisor)
2936
    if remote_info.failed or remote_info.data:
2937
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2938
                                 (self.op.instance_name,
2939
                                  instance.primary_node))
2940

    
2941
    self.op.os_type = getattr(self.op, "os_type", None)
2942
    if self.op.os_type is not None:
2943
      # OS verification
2944
      pnode = self.cfg.GetNodeInfo(
2945
        self.cfg.ExpandNodeName(instance.primary_node))
2946
      if pnode is None:
2947
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2948
                                   self.op.pnode)
2949
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2950
      result.Raise()
2951
      if not isinstance(result.data, objects.OS):
2952
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2953
                                   " primary node"  % self.op.os_type)
2954

    
2955
    self.instance = instance
2956

    
2957
  def Exec(self, feedback_fn):
2958
    """Reinstall the instance.
2959

2960
    """
2961
    inst = self.instance
2962

    
2963
    if self.op.os_type is not None:
2964
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2965
      inst.os = self.op.os_type
2966
      self.cfg.Update(inst)
2967

    
2968
    _StartInstanceDisks(self, inst, None)
2969
    try:
2970
      feedback_fn("Running the instance OS create scripts...")
2971
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2972
      msg = result.RemoteFailMsg()
2973
      if msg:
2974
        raise errors.OpExecError("Could not install OS for instance %s"
2975
                                 " on node %s: %s" %
2976
                                 (inst.name, inst.primary_node, msg))
2977
    finally:
2978
      _ShutdownInstanceDisks(self, inst)
2979

    
2980

    
2981
class LURenameInstance(LogicalUnit):
2982
  """Rename an instance.
2983

2984
  """
2985
  HPATH = "instance-rename"
2986
  HTYPE = constants.HTYPE_INSTANCE
2987
  _OP_REQP = ["instance_name", "new_name"]
2988

    
2989
  def BuildHooksEnv(self):
2990
    """Build hooks env.
2991

2992
    This runs on master, primary and secondary nodes of the instance.
2993

2994
    """
2995
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2996
    env["INSTANCE_NEW_NAME"] = self.op.new_name
2997
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2998
    return env, nl, nl
2999

    
3000
  def CheckPrereq(self):
3001
    """Check prerequisites.
3002

3003
    This checks that the instance is in the cluster and is not running.
3004

3005
    """
3006
    instance = self.cfg.GetInstanceInfo(
3007
      self.cfg.ExpandInstanceName(self.op.instance_name))
3008
    if instance is None:
3009
      raise errors.OpPrereqError("Instance '%s' not known" %
3010
                                 self.op.instance_name)
3011
    _CheckNodeOnline(self, instance.primary_node)
3012

    
3013
    if instance.admin_up:
3014
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3015
                                 self.op.instance_name)
3016
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3017
                                              instance.name,
3018
                                              instance.hypervisor)
3019
    remote_info.Raise()
3020
    if remote_info.data:
3021
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3022
                                 (self.op.instance_name,
3023
                                  instance.primary_node))
3024
    self.instance = instance
3025

    
3026
    # new name verification
3027
    name_info = utils.HostInfo(self.op.new_name)
3028

    
3029
    self.op.new_name = new_name = name_info.name
3030
    instance_list = self.cfg.GetInstanceList()
3031
    if new_name in instance_list:
3032
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3033
                                 new_name)
3034

    
3035
    if not getattr(self.op, "ignore_ip", False):
3036
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3037
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3038
                                   (name_info.ip, new_name))
3039

    
3040

    
3041
  def Exec(self, feedback_fn):
3042
    """Reinstall the instance.
3043

3044
    """
3045
    inst = self.instance
3046
    old_name = inst.name
3047

    
3048
    if inst.disk_template == constants.DT_FILE:
3049
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3050

    
3051
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3052
    # Change the instance lock. This is definitely safe while we hold the BGL
3053
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3054
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3055

    
3056
    # re-read the instance from the configuration after rename
3057
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3058

    
3059
    if inst.disk_template == constants.DT_FILE:
3060
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3061
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3062
                                                     old_file_storage_dir,
3063
                                                     new_file_storage_dir)
3064
      result.Raise()
3065
      if not result.data:
3066
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3067
                                 " directory '%s' to '%s' (but the instance"
3068
                                 " has been renamed in Ganeti)" % (
3069
                                 inst.primary_node, old_file_storage_dir,
3070
                                 new_file_storage_dir))
3071

    
3072
      if not result.data[0]:
3073
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3074
                                 " (but the instance has been renamed in"
3075
                                 " Ganeti)" % (old_file_storage_dir,
3076
                                               new_file_storage_dir))
3077

    
3078
    _StartInstanceDisks(self, inst, None)
3079
    try:
3080
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3081
                                                 old_name)
3082
      msg = result.RemoteFailMsg()
3083
      if msg:
3084
        msg = ("Could not run OS rename script for instance %s on node %s"
3085
               " (but the instance has been renamed in Ganeti): %s" %
3086
               (inst.name, inst.primary_node, msg))
3087
        self.proc.LogWarning(msg)
3088
    finally:
3089
      _ShutdownInstanceDisks(self, inst)
3090

    
3091

    
3092
class LURemoveInstance(LogicalUnit):
3093
  """Remove an instance.
3094

3095
  """
3096
  HPATH = "instance-remove"
3097
  HTYPE = constants.HTYPE_INSTANCE
3098
  _OP_REQP = ["instance_name", "ignore_failures"]
3099
  REQ_BGL = False
3100

    
3101
  def ExpandNames(self):
3102
    self._ExpandAndLockInstance()
3103
    self.needed_locks[locking.LEVEL_NODE] = []
3104
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3105

    
3106
  def DeclareLocks(self, level):
3107
    if level == locking.LEVEL_NODE:
3108
      self._LockInstancesNodes()
3109

    
3110
  def BuildHooksEnv(self):
3111
    """Build hooks env.
3112

3113
    This runs on master, primary and secondary nodes of the instance.
3114

3115
    """
3116
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3117
    nl = [self.cfg.GetMasterNode()]
3118
    return env, nl, nl
3119

    
3120
  def CheckPrereq(self):
3121
    """Check prerequisites.
3122

3123
    This checks that the instance is in the cluster.
3124

3125
    """
3126
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3127
    assert self.instance is not None, \
3128
      "Cannot retrieve locked instance %s" % self.op.instance_name
3129

    
3130
  def Exec(self, feedback_fn):
3131
    """Remove the instance.
3132

3133
    """
3134
    instance = self.instance
3135
    logging.info("Shutting down instance %s on node %s",
3136
                 instance.name, instance.primary_node)
3137

    
3138
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3139
    msg = result.RemoteFailMsg()
3140
    if msg:
3141
      if self.op.ignore_failures:
3142
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3143
      else:
3144
        raise errors.OpExecError("Could not shutdown instance %s on"
3145
                                 " node %s: %s" %
3146
                                 (instance.name, instance.primary_node, msg))
3147

    
3148
    logging.info("Removing block devices for instance %s", instance.name)
3149

    
3150
    if not _RemoveDisks(self, instance):
3151
      if self.op.ignore_failures:
3152
        feedback_fn("Warning: can't remove instance's disks")
3153
      else:
3154
        raise errors.OpExecError("Can't remove instance's disks")
3155

    
3156
    logging.info("Removing instance %s out of cluster config", instance.name)
3157

    
3158
    self.cfg.RemoveInstance(instance.name)
3159
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3160

    
3161

    
3162
class LUQueryInstances(NoHooksLU):
3163
  """Logical unit for querying instances.
3164

3165
  """
3166
  _OP_REQP = ["output_fields", "names", "use_locking"]
3167
  REQ_BGL = False
3168
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3169
                                    "admin_state",
3170
                                    "disk_template", "ip", "mac", "bridge",
3171
                                    "sda_size", "sdb_size", "vcpus", "tags",
3172
                                    "network_port", "beparams",
3173
                                    r"(disk)\.(size)/([0-9]+)",
3174
                                    r"(disk)\.(sizes)", "disk_usage",
3175
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3176
                                    r"(nic)\.(macs|ips|bridges)",
3177
                                    r"(disk|nic)\.(count)",
3178
                                    "serial_no", "hypervisor", "hvparams",] +
3179
                                  ["hv/%s" % name
3180
                                   for name in constants.HVS_PARAMETERS] +
3181
                                  ["be/%s" % name
3182
                                   for name in constants.BES_PARAMETERS])
3183
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3184

    
3185

    
3186
  def ExpandNames(self):
3187
    _CheckOutputFields(static=self._FIELDS_STATIC,
3188
                       dynamic=self._FIELDS_DYNAMIC,
3189
                       selected=self.op.output_fields)
3190

    
3191
    self.needed_locks = {}
3192
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3193
    self.share_locks[locking.LEVEL_NODE] = 1
3194

    
3195
    if self.op.names:
3196
      self.wanted = _GetWantedInstances(self, self.op.names)
3197
    else:
3198
      self.wanted = locking.ALL_SET
3199

    
3200
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3201
    self.do_locking = self.do_node_query and self.op.use_locking
3202
    if self.do_locking:
3203
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3204
      self.needed_locks[locking.LEVEL_NODE] = []
3205
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3206

    
3207
  def DeclareLocks(self, level):
3208
    if level == locking.LEVEL_NODE and self.do_locking:
3209
      self._LockInstancesNodes()
3210

    
3211
  def CheckPrereq(self):
3212
    """Check prerequisites.
3213

3214
    """
3215
    pass
3216

    
3217
  def Exec(self, feedback_fn):
3218
    """Computes the list of nodes and their attributes.
3219

3220
    """
3221
    all_info = self.cfg.GetAllInstancesInfo()
3222
    if self.wanted == locking.ALL_SET:
3223
      # caller didn't specify instance names, so ordering is not important
3224
      if self.do_locking:
3225
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3226
      else:
3227
        instance_names = all_info.keys()
3228
      instance_names = utils.NiceSort(instance_names)
3229
    else:
3230
      # caller did specify names, so we must keep the ordering
3231
      if self.do_locking:
3232
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3233
      else:
3234
        tgt_set = all_info.keys()
3235
      missing = set(self.wanted).difference(tgt_set)
3236
      if missing:
3237
        raise errors.OpExecError("Some instances were removed before"
3238
                                 " retrieving their data: %s" % missing)
3239
      instance_names = self.wanted
3240

    
3241
    instance_list = [all_info[iname] for iname in instance_names]
3242

    
3243
    # begin data gathering
3244

    
3245
    nodes = frozenset([inst.primary_node for inst in instance_list])
3246
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3247

    
3248
    bad_nodes = []
3249
    off_nodes = []
3250
    if self.do_node_query:
3251
      live_data = {}
3252
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3253
      for name in nodes:
3254
        result = node_data[name]
3255
        if result.offline:
3256
          # offline nodes will be in both lists
3257
          off_nodes.append(name)
3258
        if result.failed:
3259
          bad_nodes.append(name)
3260
        else:
3261
          if result.data:
3262
            live_data.update(result.data)
3263
            # else no instance is alive
3264
    else:
3265
      live_data = dict([(name, {}) for name in instance_names])
3266

    
3267
    # end data gathering
3268

    
3269
    HVPREFIX = "hv/"
3270
    BEPREFIX = "be/"
3271
    output = []
3272
    for instance in instance_list:
3273
      iout = []
3274
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3275
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3276
      for field in self.op.output_fields:
3277
        st_match = self._FIELDS_STATIC.Matches(field)
3278
        if field == "name":
3279
          val = instance.name
3280
        elif field == "os":
3281
          val = instance.os
3282
        elif field == "pnode":
3283
          val = instance.primary_node
3284
        elif field == "snodes":
3285
          val = list(instance.secondary_nodes)
3286
        elif field == "admin_state":
3287
          val = instance.admin_up
3288
        elif field == "oper_state":
3289
          if instance.primary_node in bad_nodes:
3290
            val = None
3291
          else:
3292
            val = bool(live_data.get(instance.name))
3293
        elif field == "status":
3294
          if instance.primary_node in off_nodes:
3295
            val = "ERROR_nodeoffline"
3296
          elif instance.primary_node in bad_nodes:
3297
            val = "ERROR_nodedown"
3298
          else:
3299
            running = bool(live_data.get(instance.name))
3300
            if running:
3301
              if instance.admin_up:
3302
                val = "running"
3303
              else:
3304
                val = "ERROR_up"
3305
            else:
3306
              if instance.admin_up:
3307
                val = "ERROR_down"
3308
              else:
3309
                val = "ADMIN_down"
3310
        elif field == "oper_ram":
3311
          if instance.primary_node in bad_nodes:
3312
            val = None
3313
          elif instance.name in live_data:
3314
            val = live_data[instance.name].get("memory", "?")
3315
          else:
3316
            val = "-"
3317
        elif field == "disk_template":
3318
          val = instance.disk_template
3319
        elif field == "ip":
3320
          val = instance.nics[0].ip
3321
        elif field == "bridge":
3322
          val = instance.nics[0].bridge
3323
        elif field == "mac":
3324
          val = instance.nics[0].mac
3325
        elif field == "sda_size" or field == "sdb_size":
3326
          idx = ord(field[2]) - ord('a')
3327
          try:
3328
            val = instance.FindDisk(idx).size
3329
          except errors.OpPrereqError:
3330
            val = None
3331
        elif field == "disk_usage": # total disk usage per node
3332
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3333
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3334
        elif field == "tags":
3335
          val = list(instance.GetTags())
3336
        elif field == "serial_no":
3337
          val = instance.serial_no
3338
        elif field == "network_port":
3339
          val = instance.network_port
3340
        elif field == "hypervisor":
3341
          val = instance.hypervisor
3342
        elif field == "hvparams":
3343
          val = i_hv
3344
        elif (field.startswith(HVPREFIX) and
3345
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3346
          val = i_hv.get(field[len(HVPREFIX):], None)
3347
        elif field == "beparams":
3348
          val = i_be
3349
        elif (field.startswith(BEPREFIX) and
3350
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3351
          val = i_be.get(field[len(BEPREFIX):], None)
3352
        elif st_match and st_match.groups():
3353
          # matches a variable list
3354
          st_groups = st_match.groups()
3355
          if st_groups and st_groups[0] == "disk":
3356
            if st_groups[1] == "count":
3357
              val = len(instance.disks)
3358
            elif st_groups[1] == "sizes":
3359
              val = [disk.size for disk in instance.disks]
3360
            elif st_groups[1] == "size":
3361
              try:
3362
                val = instance.FindDisk(st_groups[2]).size
3363
              except errors.OpPrereqError:
3364
                val = None
3365
            else:
3366
              assert False, "Unhandled disk parameter"
3367
          elif st_groups[0] == "nic":
3368
            if st_groups[1] == "count":
3369
              val = len(instance.nics)
3370
            elif st_groups[1] == "macs":
3371
              val = [nic.mac for nic in instance.nics]
3372
            elif st_groups[1] == "ips":
3373
              val = [nic.ip for nic in instance.nics]
3374
            elif st_groups[1] == "bridges":
3375
              val = [nic.bridge for nic in instance.nics]
3376
            else:
3377
              # index-based item
3378
              nic_idx = int(st_groups[2])
3379
              if nic_idx >= len(instance.nics):
3380
                val = None
3381
              else:
3382
                if st_groups[1] == "mac":
3383
                  val = instance.nics[nic_idx].mac
3384
                elif st_groups[1] == "ip":
3385
                  val = instance.nics[nic_idx].ip
3386
                elif st_groups[1] == "bridge":
3387
                  val = instance.nics[nic_idx].bridge
3388
                else:
3389
                  assert False, "Unhandled NIC parameter"
3390
          else:
3391
            assert False, "Unhandled variable parameter"
3392
        else:
3393
          raise errors.ParameterError(field)
3394
        iout.append(val)
3395
      output.append(iout)
3396

    
3397
    return output
3398

    
3399

    
3400
class LUFailoverInstance(LogicalUnit):
3401
  """Failover an instance.
3402

3403
  """
3404
  HPATH = "instance-failover"
3405
  HTYPE = constants.HTYPE_INSTANCE
3406
  _OP_REQP = ["instance_name", "ignore_consistency"]
3407
  REQ_BGL = False
3408

    
3409
  def ExpandNames(self):
3410
    self._ExpandAndLockInstance()
3411
    self.needed_locks[locking.LEVEL_NODE] = []
3412
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3413

    
3414
  def DeclareLocks(self, level):
3415
    if level == locking.LEVEL_NODE:
3416
      self._LockInstancesNodes()
3417

    
3418
  def BuildHooksEnv(self):
3419
    """Build hooks env.
3420

3421
    This runs on master, primary and secondary nodes of the instance.
3422

3423
    """
3424
    env = {
3425
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3426
      }
3427
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3428
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3429
    return env, nl, nl
3430

    
3431
  def CheckPrereq(self):
3432
    """Check prerequisites.
3433

3434
    This checks that the instance is in the cluster.
3435

3436
    """
3437
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3438
    assert self.instance is not None, \
3439
      "Cannot retrieve locked instance %s" % self.op.instance_name
3440

    
3441
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3442
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3443
      raise errors.OpPrereqError("Instance's disk layout is not"
3444
                                 " network mirrored, cannot failover.")
3445

    
3446
    secondary_nodes = instance.secondary_nodes
3447
    if not secondary_nodes:
3448
      raise errors.ProgrammerError("no secondary node but using "
3449
                                   "a mirrored disk template")
3450

    
3451
    target_node = secondary_nodes[0]
3452
    _CheckNodeOnline(self, target_node)
3453
    _CheckNodeNotDrained(self, target_node)
3454
    # check memory requirements on the secondary node
3455
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3456
                         instance.name, bep[constants.BE_MEMORY],
3457
                         instance.hypervisor)
3458

    
3459
    # check bridge existance
3460
    brlist = [nic.bridge for nic in instance.nics]
3461
    result = self.rpc.call_bridges_exist(target_node, brlist)
3462
    result.Raise()
3463
    if not result.data:
3464
      raise errors.OpPrereqError("One or more target bridges %s does not"
3465
                                 " exist on destination node '%s'" %
3466
                                 (brlist, target_node))
3467

    
3468
  def Exec(self, feedback_fn):
3469
    """Failover an instance.
3470

3471
    The failover is done by shutting it down on its present node and
3472
    starting it on the secondary.
3473

3474
    """
3475
    instance = self.instance
3476

    
3477
    source_node = instance.primary_node
3478
    target_node = instance.secondary_nodes[0]
3479

    
3480
    feedback_fn("* checking disk consistency between source and target")
3481
    for dev in instance.disks:
3482
      # for drbd, these are drbd over lvm
3483
      if not _CheckDiskConsistency(self, dev, target_node, False):
3484
        if instance.admin_up and not self.op.ignore_consistency:
3485
          raise errors.OpExecError("Disk %s is degraded on target node,"
3486
                                   " aborting failover." % dev.iv_name)
3487

    
3488
    feedback_fn("* shutting down instance on source node")
3489
    logging.info("Shutting down instance %s on node %s",
3490
                 instance.name, source_node)
3491

    
3492
    result = self.rpc.call_instance_shutdown(source_node, instance)
3493
    msg = result.RemoteFailMsg()
3494
    if msg:
3495
      if self.op.ignore_consistency:
3496
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3497
                             " Proceeding anyway. Please make sure node"
3498
                             " %s is down. Error details: %s",
3499
                             instance.name, source_node, source_node, msg)
3500
      else:
3501
        raise errors.OpExecError("Could not shutdown instance %s on"
3502
                                 " node %s: %s" %
3503
                                 (instance.name, source_node, msg))
3504

    
3505
    feedback_fn("* deactivating the instance's disks on source node")
3506
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3507
      raise errors.OpExecError("Can't shut down the instance's disks.")
3508

    
3509
    instance.primary_node = target_node
3510
    # distribute new instance config to the other nodes
3511
    self.cfg.Update(instance)
3512

    
3513
    # Only start the instance if it's marked as up
3514
    if instance.admin_up:
3515
      feedback_fn("* activating the instance's disks on target node")
3516
      logging.info("Starting instance %s on node %s",
3517
                   instance.name, target_node)
3518

    
3519
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3520
                                               ignore_secondaries=True)
3521
      if not disks_ok:
3522
        _ShutdownInstanceDisks(self, instance)
3523
        raise errors.OpExecError("Can't activate the instance's disks")
3524

    
3525
      feedback_fn("* starting the instance on the target node")
3526
      result = self.rpc.call_instance_start(target_node, instance)
3527
      msg = result.RemoteFailMsg()
3528
      if msg:
3529
        _ShutdownInstanceDisks(self, instance)
3530
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3531
                                 (instance.name, target_node, msg))
3532

    
3533

    
3534
class LUMigrateInstance(LogicalUnit):
3535
  """Migrate an instance.
3536

3537
  This is migration without shutting down, compared to the failover,
3538
  which is done with shutdown.
3539

3540
  """
3541
  HPATH = "instance-migrate"
3542
  HTYPE = constants.HTYPE_INSTANCE
3543
  _OP_REQP = ["instance_name", "live", "cleanup"]
3544

    
3545
  REQ_BGL = False
3546

    
3547
  def ExpandNames(self):
3548
    self._ExpandAndLockInstance()
3549
    self.needed_locks[locking.LEVEL_NODE] = []
3550
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3551

    
3552
  def DeclareLocks(self, level):
3553
    if level == locking.LEVEL_NODE:
3554
      self._LockInstancesNodes()
3555

    
3556
  def BuildHooksEnv(self):
3557
    """Build hooks env.
3558

3559
    This runs on master, primary and secondary nodes of the instance.
3560

3561
    """
3562
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3563
    env["MIGRATE_LIVE"] = self.op.live
3564
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3565
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3566
    return env, nl, nl
3567

    
3568
  def CheckPrereq(self):
3569
    """Check prerequisites.
3570

3571
    This checks that the instance is in the cluster.
3572

3573
    """
3574
    instance = self.cfg.GetInstanceInfo(
3575
      self.cfg.ExpandInstanceName(self.op.instance_name))
3576
    if instance is None:
3577
      raise errors.OpPrereqError("Instance '%s' not known" %
3578
                                 self.op.instance_name)
3579

    
3580
    if instance.disk_template != constants.DT_DRBD8:
3581
      raise errors.OpPrereqError("Instance's disk layout is not"
3582
                                 " drbd8, cannot migrate.")
3583

    
3584
    secondary_nodes = instance.secondary_nodes
3585
    if not secondary_nodes:
3586
      raise errors.ConfigurationError("No secondary node but using"
3587
                                      " drbd8 disk template")
3588

    
3589
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3590

    
3591
    target_node = secondary_nodes[0]
3592
    # check memory requirements on the secondary node
3593
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3594
                         instance.name, i_be[constants.BE_MEMORY],
3595
                         instance.hypervisor)
3596

    
3597
    # check bridge existance
3598
    brlist = [nic.bridge for nic in instance.nics]
3599
    result = self.rpc.call_bridges_exist(target_node, brlist)
3600
    if result.failed or not result.data:
3601
      raise errors.OpPrereqError("One or more target bridges %s does not"
3602
                                 " exist on destination node '%s'" %
3603
                                 (brlist, target_node))
3604

    
3605
    if not self.op.cleanup:
3606
      _CheckNodeNotDrained(self, target_node)
3607
      result = self.rpc.call_instance_migratable(instance.primary_node,
3608
                                                 instance)
3609
      msg = result.RemoteFailMsg()
3610
      if msg:
3611
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3612
                                   msg)
3613

    
3614
    self.instance = instance
3615

    
3616
  def _WaitUntilSync(self):
3617
    """Poll with custom rpc for disk sync.
3618

3619
    This uses our own step-based rpc call.
3620

3621
    """
3622
    self.feedback_fn("* wait until resync is done")
3623
    all_done = False
3624
    while not all_done:
3625
      all_done = True
3626
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3627
                                            self.nodes_ip,
3628
                                            self.instance.disks)
3629
      min_percent = 100
3630
      for node, nres in result.items():
3631
        msg = nres.RemoteFailMsg()
3632
        if msg:
3633
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3634
                                   (node, msg))
3635
        node_done, node_percent = nres.payload
3636
        all_done = all_done and node_done
3637
        if node_percent is not None:
3638
          min_percent = min(min_percent, node_percent)
3639
      if not all_done:
3640
        if min_percent < 100:
3641
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3642
        time.sleep(2)
3643

    
3644
  def _EnsureSecondary(self, node):
3645
    """Demote a node to secondary.
3646

3647
    """
3648
    self.feedback_fn("* switching node %s to secondary mode" % node)
3649

    
3650
    for dev in self.instance.disks:
3651
      self.cfg.SetDiskID(dev, node)
3652

    
3653
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3654
                                          self.instance.disks)
3655
    msg = result.RemoteFailMsg()
3656
    if msg:
3657
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3658
                               " error %s" % (node, msg))
3659

    
3660
  def _GoStandalone(self):
3661
    """Disconnect from the network.
3662

3663
    """
3664
    self.feedback_fn("* changing into standalone mode")
3665
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3666
                                               self.instance.disks)
3667
    for node, nres in result.items():
3668
      msg = nres.RemoteFailMsg()
3669
      if msg:
3670
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3671
                                 " error %s" % (node, msg))
3672

    
3673
  def _GoReconnect(self, multimaster):
3674
    """Reconnect to the network.
3675

3676
    """
3677
    if multimaster:
3678
      msg = "dual-master"
3679
    else:
3680
      msg = "single-master"
3681
    self.feedback_fn("* changing disks into %s mode" % msg)
3682
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3683
                                           self.instance.disks,
3684
                                           self.instance.name, multimaster)
3685
    for node, nres in result.items():
3686
      msg = nres.RemoteFailMsg()
3687
      if msg:
3688
        raise errors.OpExecError("Cannot change disks config on node %s,"
3689
                                 " error: %s" % (node, msg))
3690

    
3691
  def _ExecCleanup(self):
3692
    """Try to cleanup after a failed migration.
3693

3694
    The cleanup is done by:
3695
      - check that the instance is running only on one node
3696
        (and update the config if needed)
3697
      - change disks on its secondary node to secondary
3698
      - wait until disks are fully synchronized
3699
      - disconnect from the network
3700
      - change disks into single-master mode
3701
      - wait again until disks are fully synchronized
3702

3703
    """
3704
    instance = self.instance
3705
    target_node = self.target_node
3706
    source_node = self.source_node
3707

    
3708
    # check running on only one node
3709
    self.feedback_fn("* checking where the instance actually runs"
3710
                     " (if this hangs, the hypervisor might be in"
3711
                     " a bad state)")
3712
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3713
    for node, result in ins_l.items():
3714
      result.Raise()
3715
      if not isinstance(result.data, list):
3716
        raise errors.OpExecError("Can't contact node '%s'" % node)
3717

    
3718
    runningon_source = instance.name in ins_l[source_node].data
3719
    runningon_target = instance.name in ins_l[target_node].data
3720

    
3721
    if runningon_source and runningon_target:
3722
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3723
                               " or the hypervisor is confused. You will have"
3724
                               " to ensure manually that it runs only on one"
3725
                               " and restart this operation.")
3726

    
3727
    if not (runningon_source or runningon_target):
3728
      raise errors.OpExecError("Instance does not seem to be running at all."
3729
                               " In this case, it's safer to repair by"
3730
                               " running 'gnt-instance stop' to ensure disk"
3731
                               " shutdown, and then restarting it.")
3732

    
3733
    if runningon_target:
3734
      # the migration has actually succeeded, we need to update the config
3735
      self.feedback_fn("* instance running on secondary node (%s),"
3736
                       " updating config" % target_node)
3737
      instance.primary_node = target_node
3738
      self.cfg.Update(instance)
3739
      demoted_node = source_node
3740
    else:
3741
      self.feedback_fn("* instance confirmed to be running on its"
3742
                       " primary node (%s)" % source_node)
3743
      demoted_node = target_node
3744

    
3745
    self._EnsureSecondary(demoted_node)
3746
    try:
3747
      self._WaitUntilSync()
3748
    except errors.OpExecError:
3749
      # we ignore here errors, since if the device is standalone, it
3750
      # won't be able to sync
3751
      pass
3752
    self._GoStandalone()
3753
    self._GoReconnect(False)
3754
    self._WaitUntilSync()
3755

    
3756
    self.feedback_fn("* done")
3757

    
3758
  def _RevertDiskStatus(self):
3759
    """Try to revert the disk status after a failed migration.
3760

3761
    """
3762
    target_node = self.target_node
3763
    try:
3764
      self._EnsureSecondary(target_node)
3765
      self._GoStandalone()
3766
      self._GoReconnect(False)
3767
      self._WaitUntilSync()
3768
    except errors.OpExecError, err:
3769
      self.LogWarning("Migration failed and I can't reconnect the"
3770
                      " drives: error '%s'\n"
3771
                      "Please look and recover the instance status" %
3772
                      str(err))
3773

    
3774
  def _AbortMigration(self):
3775
    """Call the hypervisor code to abort a started migration.
3776

3777
    """
3778
    instance = self.instance
3779
    target_node = self.target_node
3780
    migration_info = self.migration_info
3781

    
3782
    abort_result = self.rpc.call_finalize_migration(target_node,
3783
                                                    instance,
3784
                                                    migration_info,
3785
                                                    False)
3786
    abort_msg = abort_result.RemoteFailMsg()
3787
    if abort_msg:
3788
      logging.error("Aborting migration failed on target node %s: %s" %
3789
                    (target_node, abort_msg))
3790
      # Don't raise an exception here, as we stil have to try to revert the
3791
      # disk status, even if this step failed.
3792

    
3793
  def _ExecMigration(self):
3794
    """Migrate an instance.
3795

3796
    The migrate is done by:
3797
      - change the disks into dual-master mode
3798
      - wait until disks are fully synchronized again
3799
      - migrate the instance
3800
      - change disks on the new secondary node (the old primary) to secondary
3801
      - wait until disks are fully synchronized
3802
      - change disks into single-master mode
3803

3804
    """
3805
    instance = self.instance
3806
    target_node = self.target_node
3807
    source_node = self.source_node
3808

    
3809
    self.feedback_fn("* checking disk consistency between source and target")
3810
    for dev in instance.disks:
3811
      if not _CheckDiskConsistency(self, dev, target_node, False):
3812
        raise errors.OpExecError("Disk %s is degraded or not fully"
3813
                                 " synchronized on target node,"
3814
                                 " aborting migrate." % dev.iv_name)
3815

    
3816
    # First get the migration information from the remote node
3817
    result = self.rpc.call_migration_info(source_node, instance)
3818
    msg = result.RemoteFailMsg()
3819
    if msg:
3820
      log_err = ("Failed fetching source migration information from %s: %s" %
3821
                 (source_node, msg))
3822
      logging.error(log_err)
3823
      raise errors.OpExecError(log_err)
3824

    
3825
    self.migration_info = migration_info = result.payload
3826

    
3827
    # Then switch the disks to master/master mode
3828
    self._EnsureSecondary(target_node)
3829
    self._GoStandalone()
3830
    self._GoReconnect(True)
3831
    self._WaitUntilSync()
3832

    
3833
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3834
    result = self.rpc.call_accept_instance(target_node,
3835
                                           instance,
3836
                                           migration_info,
3837
                                           self.nodes_ip[target_node])
3838

    
3839
    msg = result.RemoteFailMsg()
3840
    if msg:
3841
      logging.error("Instance pre-migration failed, trying to revert"
3842
                    " disk status: %s", msg)
3843
      self._AbortMigration()
3844
      self._RevertDiskStatus()
3845
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3846
                               (instance.name, msg))
3847

    
3848
    self.feedback_fn("* migrating instance to %s" % target_node)
3849
    time.sleep(10)
3850
    result = self.rpc.call_instance_migrate(source_node, instance,
3851
                                            self.nodes_ip[target_node],
3852
                                            self.op.live)
3853
    msg = result.RemoteFailMsg()
3854
    if msg:
3855
      logging.error("Instance migration failed, trying to revert"
3856
                    " disk status: %s", msg)
3857
      self._AbortMigration()
3858
      self._RevertDiskStatus()
3859
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3860
                               (instance.name, msg))
3861
    time.sleep(10)
3862

    
3863
    instance.primary_node = target_node
3864
    # distribute new instance config to the other nodes
3865
    self.cfg.Update(instance)
3866

    
3867
    result = self.rpc.call_finalize_migration(target_node,
3868
                                              instance,
3869
                                              migration_info,
3870
                                              True)
3871
    msg = result.RemoteFailMsg()
3872
    if msg:
3873
      logging.error("Instance migration succeeded, but finalization failed:"
3874
                    " %s" % msg)
3875
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3876
                               msg)
3877

    
3878
    self._EnsureSecondary(source_node)
3879
    self._WaitUntilSync()
3880
    self._GoStandalone()
3881
    self._GoReconnect(False)
3882
    self._WaitUntilSync()
3883

    
3884
    self.feedback_fn("* done")
3885

    
3886
  def Exec(self, feedback_fn):
3887
    """Perform the migration.
3888

3889
    """
3890
    self.feedback_fn = feedback_fn
3891

    
3892
    self.source_node = self.instance.primary_node
3893
    self.target_node = self.instance.secondary_nodes[0]
3894
    self.all_nodes = [self.source_node, self.target_node]
3895
    self.nodes_ip = {
3896
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3897
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3898
      }
3899
    if self.op.cleanup:
3900
      return self._ExecCleanup()
3901
    else:
3902
      return self._ExecMigration()
3903

    
3904

    
3905
def _CreateBlockDev(lu, node, instance, device, force_create,
3906
                    info, force_open):
3907
  """Create a tree of block devices on a given node.
3908

3909
  If this device type has to be created on secondaries, create it and
3910
  all its children.
3911

3912
  If not, just recurse to children keeping the same 'force' value.
3913

3914
  @param lu: the lu on whose behalf we execute
3915
  @param node: the node on which to create the device
3916
  @type instance: L{objects.Instance}
3917
  @param instance: the instance which owns the device
3918
  @type device: L{objects.Disk}
3919
  @param device: the device to create
3920
  @type force_create: boolean
3921
  @param force_create: whether to force creation of this device; this
3922
      will be change to True whenever we find a device which has
3923
      CreateOnSecondary() attribute
3924
  @param info: the extra 'metadata' we should attach to the device
3925
      (this will be represented as a LVM tag)
3926
  @type force_open: boolean
3927
  @param force_open: this parameter will be passes to the
3928
      L{backend.BlockdevCreate} function where it specifies
3929
      whether we run on primary or not, and it affects both
3930
      the child assembly and the device own Open() execution
3931

3932
  """
3933
  if device.CreateOnSecondary():
3934
    force_create = True
3935

    
3936
  if device.children:
3937
    for child in device.children:
3938
      _CreateBlockDev(lu, node, instance, child, force_create,
3939
                      info, force_open)
3940

    
3941
  if not force_create:
3942
    return
3943

    
3944
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3945

    
3946

    
3947
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3948
  """Create a single block device on a given node.
3949

3950
  This will not recurse over children of the device, so they must be
3951
  created in advance.
3952

3953
  @param lu: the lu on whose behalf we execute
3954
  @param node: the node on which to create the device
3955
  @type instance: L{objects.Instance}
3956
  @param instance: the instance which owns the device
3957
  @type device: L{objects.Disk}
3958
  @param device: the device to create
3959
  @param info: the extra 'metadata' we should attach to the device
3960
      (this will be represented as a LVM tag)
3961
  @type force_open: boolean
3962
  @param force_open: this parameter will be passes to the
3963
      L{backend.BlockdevCreate} function where it specifies
3964
      whether we run on primary or not, and it affects both
3965
      the child assembly and the device own Open() execution
3966

3967
  """
3968
  lu.cfg.SetDiskID(device, node)
3969
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3970
                                       instance.name, force_open, info)
3971
  msg = result.RemoteFailMsg()
3972
  if msg:
3973
    raise errors.OpExecError("Can't create block device %s on"
3974
                             " node %s for instance %s: %s" %
3975
                             (device, node, instance.name, msg))
3976
  if device.physical_id is None:
3977
    device.physical_id = result.payload
3978

    
3979

    
3980
def _GenerateUniqueNames(lu, exts):
3981
  """Generate a suitable LV name.
3982

3983
  This will generate a logical volume name for the given instance.
3984

3985
  """
3986
  results = []
3987
  for val in exts:
3988
    new_id = lu.cfg.GenerateUniqueID()
3989
    results.append("%s%s" % (new_id, val))
3990
  return results
3991

    
3992

    
3993
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3994
                         p_minor, s_minor):
3995
  """Generate a drbd8 device complete with its children.
3996

3997
  """
3998
  port = lu.cfg.AllocatePort()
3999
  vgname = lu.cfg.GetVGName()
4000
  shared_secret = lu.cfg.GenerateDRBDSecret()
4001
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4002
                          logical_id=(vgname, names[0]))
4003
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4004
                          logical_id=(vgname, names[1]))
4005
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4006
                          logical_id=(primary, secondary, port,
4007
                                      p_minor, s_minor,
4008
                                      shared_secret),
4009
                          children=[dev_data, dev_meta],
4010
                          iv_name=iv_name)
4011
  return drbd_dev
4012

    
4013

    
4014
def _GenerateDiskTemplate(lu, template_name,
4015
                          instance_name, primary_node,
4016
                          secondary_nodes, disk_info,
4017
                          file_storage_dir, file_driver,
4018
                          base_index):
4019
  """Generate the entire disk layout for a given template type.
4020

4021
  """
4022
  #TODO: compute space requirements
4023

    
4024
  vgname = lu.cfg.GetVGName()
4025
  disk_count = len(disk_info)
4026
  disks = []
4027
  if template_name == constants.DT_DISKLESS:
4028
    pass
4029
  elif template_name == constants.DT_PLAIN:
4030
    if len(secondary_nodes) != 0:
4031
      raise errors.ProgrammerError("Wrong template configuration")
4032

    
4033
    names = _GenerateUniqueNames(lu, [".disk%d" % i
4034
                                      for i in range(disk_count)])
4035
    for idx, disk in enumerate(disk_info):
4036
      disk_index = idx + base_index
4037
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4038
                              logical_id=(vgname, names[idx]),
4039
                              iv_name="disk/%d" % disk_index,
4040
                              mode=disk["mode"])
4041
      disks.append(disk_dev)
4042
  elif template_name == constants.DT_DRBD8:
4043
    if len(secondary_nodes) != 1:
4044
      raise errors.ProgrammerError("Wrong template configuration")
4045
    remote_node = secondary_nodes[0]
4046
    minors = lu.cfg.AllocateDRBDMinor(
4047
      [primary_node, remote_node] * len(disk_info), instance_name)
4048

    
4049
    names = []
4050
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4051
                                               for i in range(disk_count)]):
4052
      names.append(lv_prefix + "_data")
4053
      names.append(lv_prefix + "_meta")
4054
    for idx, disk in enumerate(disk_info):
4055
      disk_index = idx + base_index
4056
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4057
                                      disk["size"], names[idx*2:idx*2+2],
4058
                                      "disk/%d" % disk_index,
4059
                                      minors[idx*2], minors[idx*2+1])
4060
      disk_dev.mode = disk["mode"]
4061
      disks.append(disk_dev)
4062
  elif template_name == constants.DT_FILE:
4063
    if len(secondary_nodes) != 0:
4064
      raise errors.ProgrammerError("Wrong template configuration")
4065

    
4066
    for idx, disk in enumerate(disk_info):
4067
      disk_index = idx + base_index
4068
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4069
                              iv_name="disk/%d" % disk_index,
4070
                              logical_id=(file_driver,
4071
                                          "%s/disk%d" % (file_storage_dir,
4072
                                                         disk_index)),
4073
                              mode=disk["mode"])
4074
      disks.append(disk_dev)
4075
  else:
4076
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4077
  return disks
4078

    
4079

    
4080
def _GetInstanceInfoText(instance):
4081
  """Compute that text that should be added to the disk's metadata.
4082

4083
  """
4084
  return "originstname+%s" % instance.name
4085

    
4086

    
4087
def _CreateDisks(lu, instance):
4088
  """Create all disks for an instance.
4089

4090
  This abstracts away some work from AddInstance.
4091

4092
  @type lu: L{LogicalUnit}
4093
  @param lu: the logical unit on whose behalf we execute
4094
  @type instance: L{objects.Instance}
4095
  @param instance: the instance whose disks we should create
4096
  @rtype: boolean
4097
  @return: the success of the creation
4098

4099
  """
4100
  info = _GetInstanceInfoText(instance)
4101
  pnode = instance.primary_node
4102

    
4103
  if instance.disk_template == constants.DT_FILE:
4104
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4105
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4106

    
4107
    if result.failed or not result.data:
4108
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4109

    
4110
    if not result.data[0]:
4111
      raise errors.OpExecError("Failed to create directory '%s'" %
4112
                               file_storage_dir)
4113

    
4114
  # Note: this needs to be kept in sync with adding of disks in
4115
  # LUSetInstanceParams
4116
  for device in instance.disks:
4117
    logging.info("Creating volume %s for instance %s",
4118
                 device.iv_name, instance.name)
4119
    #HARDCODE
4120
    for node in instance.all_nodes:
4121
      f_create = node == pnode
4122
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4123

    
4124

    
4125
def _RemoveDisks(lu, instance):
4126
  """Remove all disks for an instance.
4127

4128
  This abstracts away some work from `AddInstance()` and
4129
  `RemoveInstance()`. Note that in case some of the devices couldn't
4130
  be removed, the removal will continue with the other ones (compare
4131
  with `_CreateDisks()`).
4132

4133
  @type lu: L{LogicalUnit}
4134
  @param lu: the logical unit on whose behalf we execute
4135
  @type instance: L{objects.Instance}
4136
  @param instance: the instance whose disks we should remove
4137
  @rtype: boolean
4138
  @return: the success of the removal
4139

4140
  """
4141
  logging.info("Removing block devices for instance %s", instance.name)
4142

    
4143
  all_result = True
4144
  for device in instance.disks:
4145
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4146
      lu.cfg.SetDiskID(disk, node)
4147
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4148
      if msg:
4149
        lu.LogWarning("Could not remove block device %s on node %s,"
4150
                      " continuing anyway: %s", device.iv_name, node, msg)
4151
        all_result = False
4152

    
4153
  if instance.disk_template == constants.DT_FILE:
4154
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4155
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4156
                                                 file_storage_dir)
4157
    if result.failed or not result.data:
4158
      logging.error("Could not remove directory '%s'", file_storage_dir)
4159
      all_result = False
4160

    
4161
  return all_result
4162

    
4163

    
4164
def _ComputeDiskSize(disk_template, disks):
4165
  """Compute disk size requirements in the volume group
4166

4167
  """
4168
  # Required free disk space as a function of disk and swap space
4169
  req_size_dict = {
4170
    constants.DT_DISKLESS: None,
4171
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4172
    # 128 MB are added for drbd metadata for each disk
4173
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4174
    constants.DT_FILE: None,
4175
  }
4176

    
4177
  if disk_template not in req_size_dict:
4178
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4179
                                 " is unknown" %  disk_template)
4180

    
4181
  return req_size_dict[disk_template]
4182

    
4183

    
4184
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4185
  """Hypervisor parameter validation.
4186

4187
  This function abstract the hypervisor parameter validation to be
4188
  used in both instance create and instance modify.
4189

4190
  @type lu: L{LogicalUnit}
4191
  @param lu: the logical unit for which we check
4192
  @type nodenames: list
4193
  @param nodenames: the list of nodes on which we should check
4194
  @type hvname: string
4195
  @param hvname: the name of the hypervisor we should use
4196
  @type hvparams: dict
4197
  @param hvparams: the parameters which we need to check
4198
  @raise errors.OpPrereqError: if the parameters are not valid
4199

4200
  """
4201
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4202
                                                  hvname,
4203
                                                  hvparams)
4204
  for node in nodenames:
4205
    info = hvinfo[node]
4206
    if info.offline:
4207
      continue
4208
    msg = info.RemoteFailMsg()
4209
    if msg:
4210
      raise errors.OpPrereqError("Hypervisor parameter validation failed:"
4211
                                 " %s" % msg)
4212

    
4213

    
4214
class LUCreateInstance(LogicalUnit):
4215
  """Create an instance.
4216

4217
  """
4218
  HPATH = "instance-add"
4219
  HTYPE = constants.HTYPE_INSTANCE
4220
  _OP_REQP = ["instance_name", "disks", "disk_template",
4221
              "mode", "start",
4222
              "wait_for_sync", "ip_check", "nics",
4223
              "hvparams", "beparams"]
4224
  REQ_BGL = False
4225

    
4226
  def _ExpandNode(self, node):
4227
    """Expands and checks one node name.
4228

4229
    """
4230
    node_full = self.cfg.ExpandNodeName(node)
4231
    if node_full is None:
4232
      raise errors.OpPrereqError("Unknown node %s" % node)
4233
    return node_full
4234

    
4235
  def ExpandNames(self):
4236
    """ExpandNames for CreateInstance.
4237

4238
    Figure out the right locks for instance creation.
4239

4240
    """
4241
    self.needed_locks = {}
4242

    
4243
    # set optional parameters to none if they don't exist
4244
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4245
      if not hasattr(self.op, attr):
4246
        setattr(self.op, attr, None)
4247

    
4248
    # cheap checks, mostly valid constants given
4249

    
4250
    # verify creation mode
4251
    if self.op.mode not in (constants.INSTANCE_CREATE,
4252
                            constants.INSTANCE_IMPORT):
4253
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4254
                                 self.op.mode)
4255

    
4256
    # disk template and mirror node verification
4257
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4258
      raise errors.OpPrereqError("Invalid disk template name")
4259

    
4260
    if self.op.hypervisor is None:
4261
      self.op.hypervisor = self.cfg.GetHypervisorType()
4262

    
4263
    cluster = self.cfg.GetClusterInfo()
4264
    enabled_hvs = cluster.enabled_hypervisors
4265
    if self.op.hypervisor not in enabled_hvs:
4266
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4267
                                 " cluster (%s)" % (self.op.hypervisor,
4268
                                  ",".join(enabled_hvs)))
4269

    
4270
    # check hypervisor parameter syntax (locally)
4271
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4272
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4273
                                  self.op.hvparams)
4274
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4275
    hv_type.CheckParameterSyntax(filled_hvp)
4276

    
4277
    # fill and remember the beparams dict
4278
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4279
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4280
                                    self.op.beparams)
4281

    
4282
    #### instance parameters check
4283

    
4284
    # instance name verification
4285
    hostname1 = utils.HostInfo(self.op.instance_name)
4286
    self.op.instance_name = instance_name = hostname1.name
4287

    
4288
    # this is just a preventive check, but someone might still add this
4289
    # instance in the meantime, and creation will fail at lock-add time
4290
    if instance_name in self.cfg.GetInstanceList():
4291
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4292
                                 instance_name)
4293

    
4294
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4295

    
4296
    # NIC buildup
4297
    self.nics = []
4298
    for nic in self.op.nics:
4299
      # ip validity checks
4300
      ip = nic.get("ip", None)
4301
      if ip is None or ip.lower() == "none":
4302
        nic_ip = None
4303
      elif ip.lower() == constants.VALUE_AUTO:
4304
        nic_ip = hostname1.ip
4305
      else:
4306
        if not utils.IsValidIP(ip):
4307
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4308
                                     " like a valid IP" % ip)
4309
        nic_ip = ip
4310

    
4311
      # MAC address verification
4312
      mac = nic.get("mac", constants.VALUE_AUTO)
4313
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4314
        if not utils.IsValidMac(mac.lower()):
4315
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4316
                                     mac)
4317
      # bridge verification
4318
      bridge = nic.get("bridge", None)
4319
      if bridge is None:
4320
        bridge = self.cfg.GetDefBridge()
4321
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4322

    
4323
    # disk checks/pre-build
4324
    self.disks = []
4325
    for disk in self.op.disks:
4326
      mode = disk.get("mode", constants.DISK_RDWR)
4327
      if mode not in constants.DISK_ACCESS_SET:
4328
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4329
                                   mode)
4330
      size = disk.get("size", None)
4331
      if size is None:
4332
        raise errors.OpPrereqError("Missing disk size")
4333
      try:
4334
        size = int(size)
4335
      except ValueError:
4336
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4337
      self.disks.append({"size": size, "mode": mode})
4338

    
4339
    # used in CheckPrereq for ip ping check
4340
    self.check_ip = hostname1.ip
4341

    
4342
    # file storage checks
4343
    if (self.op.file_driver and
4344
        not self.op.file_driver in constants.FILE_DRIVER):
4345
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4346
                                 self.op.file_driver)
4347

    
4348
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4349
      raise errors.OpPrereqError("File storage directory path not absolute")
4350

    
4351
    ### Node/iallocator related checks
4352
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4353
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4354
                                 " node must be given")
4355

    
4356
    if self.op.iallocator:
4357
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4358
    else:
4359
      self.op.pnode = self._ExpandNode(self.op.pnode)
4360
      nodelist = [self.op.pnode]
4361
      if self.op.snode is not None:
4362
        self.op.snode = self._ExpandNode(self.op.snode)
4363
        nodelist.append(self.op.snode)
4364
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4365

    
4366
    # in case of import lock the source node too
4367
    if self.op.mode == constants.INSTANCE_IMPORT:
4368
      src_node = getattr(self.op, "src_node", None)
4369
      src_path = getattr(self.op, "src_path", None)
4370

    
4371
      if src_path is None:
4372
        self.op.src_path = src_path = self.op.instance_name
4373

    
4374
      if src_node is None:
4375
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4376
        self.op.src_node = None
4377
        if os.path.isabs(src_path):
4378
          raise errors.OpPrereqError("Importing an instance from an absolute"
4379
                                     " path requires a source node option.")
4380
      else:
4381
        self.op.src_node = src_node = self._ExpandNode(src_node)
4382
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4383
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4384
        if not os.path.isabs(src_path):
4385
          self.op.src_path = src_path = \
4386
            os.path.join(constants.EXPORT_DIR, src_path)
4387

    
4388
    else: # INSTANCE_CREATE
4389
      if getattr(self.op, "os_type", None) is None:
4390
        raise errors.OpPrereqError("No guest OS specified")
4391

    
4392
  def _RunAllocator(self):
4393
    """Run the allocator based on input opcode.
4394

4395
    """
4396
    nics = [n.ToDict() for n in self.nics]
4397
    ial = IAllocator(self,
4398
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4399
                     name=self.op.instance_name,
4400
                     disk_template=self.op.disk_template,
4401
                     tags=[],
4402
                     os=self.op.os_type,
4403
                     vcpus=self.be_full[constants.BE_VCPUS],
4404
                     mem_size=self.be_full[constants.BE_MEMORY],
4405
                     disks=self.disks,
4406
                     nics=nics,
4407
                     hypervisor=self.op.hypervisor,
4408
                     )
4409

    
4410
    ial.Run(self.op.iallocator)
4411

    
4412
    if not ial.success:
4413
      raise errors.OpPrereqError("Can't compute nodes using"
4414
                                 " iallocator '%s': %s" % (self.op.iallocator,
4415
                                                           ial.info))
4416
    if len(ial.nodes) != ial.required_nodes:
4417
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4418
                                 " of nodes (%s), required %s" %
4419
                                 (self.op.iallocator, len(ial.nodes),
4420
                                  ial.required_nodes))
4421
    self.op.pnode = ial.nodes[0]
4422
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4423
                 self.op.instance_name, self.op.iallocator,
4424
                 ", ".join(ial.nodes))
4425
    if ial.required_nodes == 2:
4426
      self.op.snode = ial.nodes[1]
4427

    
4428
  def BuildHooksEnv(self):
4429
    """Build hooks env.
4430

4431
    This runs on master, primary and secondary nodes of the instance.
4432

4433
    """
4434
    env = {
4435
      "ADD_MODE": self.op.mode,
4436
      }
4437
    if self.op.mode == constants.INSTANCE_IMPORT:
4438
      env["SRC_NODE"] = self.op.src_node
4439
      env["SRC_PATH"] = self.op.src_path
4440
      env["SRC_IMAGES"] = self.src_images
4441

    
4442
    env.update(_BuildInstanceHookEnv(
4443
      name=self.op.instance_name,
4444
      primary_node=self.op.pnode,
4445
      secondary_nodes=self.secondaries,
4446
      status=self.op.start,
4447
      os_type=self.op.os_type,
4448
      memory=self.be_full[constants.BE_MEMORY],
4449
      vcpus=self.be_full[constants.BE_VCPUS],
4450
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4451
      disk_template=self.op.disk_template,
4452
      disks=[(d["size"], d["mode"]) for d in self.disks],
4453
    ))
4454

    
4455
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4456
          self.secondaries)
4457
    return env, nl, nl
4458

    
4459

    
4460
  def CheckPrereq(self):
4461
    """Check prerequisites.
4462

4463
    """
4464
    if (not self.cfg.GetVGName() and
4465
        self.op.disk_template not in constants.DTS_NOT_LVM):
4466
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4467
                                 " instances")
4468

    
4469
    if self.op.mode == constants.INSTANCE_IMPORT:
4470
      src_node = self.op.src_node
4471
      src_path = self.op.src_path
4472

    
4473
      if src_node is None:
4474
        exp_list = self.rpc.call_export_list(
4475
          self.acquired_locks[locking.LEVEL_NODE])
4476
        found = False
4477
        for node in exp_list:
4478
          if not exp_list[node].failed and src_path in exp_list[node].data:
4479
            found = True
4480
            self.op.src_node = src_node = node
4481
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4482
                                                       src_path)
4483
            break
4484
        if not found:
4485
          raise errors.OpPrereqError("No export found for relative path %s" %
4486
                                      src_path)
4487

    
4488
      _CheckNodeOnline(self, src_node)
4489
      result = self.rpc.call_export_info(src_node, src_path)
4490
      result.Raise()
4491
      if not result.data:
4492
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4493

    
4494
      export_info = result.data
4495
      if not export_info.has_section(constants.INISECT_EXP):
4496
        raise errors.ProgrammerError("Corrupted export config")
4497

    
4498
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4499
      if (int(ei_version) != constants.EXPORT_VERSION):
4500
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4501
                                   (ei_version, constants.EXPORT_VERSION))
4502

    
4503
      # Check that the new instance doesn't have less disks than the export
4504
      instance_disks = len(self.disks)
4505
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4506
      if instance_disks < export_disks:
4507
        raise errors.OpPrereqError("Not enough disks to import."
4508
                                   " (instance: %d, export: %d)" %
4509
                                   (instance_disks, export_disks))
4510

    
4511
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4512
      disk_images = []
4513
      for idx in range(export_disks):
4514
        option = 'disk%d_dump' % idx
4515
        if export_info.has_option(constants.INISECT_INS, option):
4516
          # FIXME: are the old os-es, disk sizes, etc. useful?
4517
          export_name = export_info.get(constants.INISECT_INS, option)
4518
          image = os.path.join(src_path, export_name)
4519
          disk_images.append(image)
4520
        else:
4521
          disk_images.append(False)
4522

    
4523
      self.src_images = disk_images
4524

    
4525
      old_name = export_info.get(constants.INISECT_INS, 'name')
4526
      # FIXME: int() here could throw a ValueError on broken exports
4527
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4528
      if self.op.instance_name == old_name:
4529
        for idx, nic in enumerate(self.nics):
4530
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4531
            nic_mac_ini = 'nic%d_mac' % idx
4532
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4533

    
4534
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4535
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4536
    if self.op.start and not self.op.ip_check:
4537
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4538
                                 " adding an instance in start mode")
4539

    
4540
    if self.op.ip_check:
4541
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4542
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4543
                                   (self.check_ip, self.op.instance_name))
4544

    
4545
    #### mac address generation
4546
    # By generating here the mac address both the allocator and the hooks get
4547
    # the real final mac address rather than the 'auto' or 'generate' value.
4548
    # There is a race condition between the generation and the instance object
4549
    # creation, which means that we know the mac is valid now, but we're not
4550
    # sure it will be when we actually add the instance. If things go bad
4551
    # adding the instance will abort because of a duplicate mac, and the
4552
    # creation job will fail.
4553
    for nic in self.nics:
4554
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4555
        nic.mac = self.cfg.GenerateMAC()
4556

    
4557
    #### allocator run
4558

    
4559
    if self.op.iallocator is not None:
4560
      self._RunAllocator()
4561

    
4562
    #### node related checks
4563

    
4564
    # check primary node
4565
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4566
    assert self.pnode is not None, \
4567
      "Cannot retrieve locked node %s" % self.op.pnode
4568
    if pnode.offline:
4569
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4570
                                 pnode.name)
4571
    if pnode.drained:
4572
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4573
                                 pnode.name)
4574

    
4575
    self.secondaries = []
4576

    
4577
    # mirror node verification
4578
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4579
      if self.op.snode is None:
4580
        raise errors.OpPrereqError("The networked disk templates need"
4581
                                   " a mirror node")
4582
      if self.op.snode == pnode.name:
4583
        raise errors.OpPrereqError("The secondary node cannot be"
4584
                                   " the primary node.")
4585
      _CheckNodeOnline(self, self.op.snode)
4586
      _CheckNodeNotDrained(self, self.op.snode)
4587
      self.secondaries.append(self.op.snode)
4588

    
4589
    nodenames = [pnode.name] + self.secondaries
4590

    
4591
    req_size = _ComputeDiskSize(self.op.disk_template,
4592
                                self.disks)
4593

    
4594
    # Check lv size requirements
4595
    if req_size is not None:
4596
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4597
                                         self.op.hypervisor)
4598
      for node in nodenames:
4599
        info = nodeinfo[node]
4600
        info.Raise()
4601
        info = info.data
4602
        if not info:
4603
          raise errors.OpPrereqError("Cannot get current information"
4604
                                     " from node '%s'" % node)
4605
        vg_free = info.get('vg_free', None)
4606
        if not isinstance(vg_free, int):
4607
          raise errors.OpPrereqError("Can't compute free disk space on"
4608
                                     " node %s" % node)
4609
        if req_size > info['vg_free']:
4610
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4611
                                     " %d MB available, %d MB required" %
4612
                                     (node, info['vg_free'], req_size))
4613

    
4614
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4615

    
4616
    # os verification
4617
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4618
    result.Raise()
4619
    if not isinstance(result.data, objects.OS):
4620
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4621
                                 " primary node"  % self.op.os_type)
4622

    
4623
    # bridge check on primary node
4624
    bridges = [n.bridge for n in self.nics]
4625
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4626
    result.Raise()
4627
    if not result.data:
4628
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4629
                                 " exist on destination node '%s'" %
4630
                                 (",".join(bridges), pnode.name))
4631

    
4632
    # memory check on primary node
4633
    if self.op.start:
4634
      _CheckNodeFreeMemory(self, self.pnode.name,
4635
                           "creating instance %s" % self.op.instance_name,
4636
                           self.be_full[constants.BE_MEMORY],
4637
                           self.op.hypervisor)
4638

    
4639
  def Exec(self, feedback_fn):
4640
    """Create and add the instance to the cluster.
4641

4642
    """
4643
    instance = self.op.instance_name
4644
    pnode_name = self.pnode.name
4645

    
4646
    ht_kind = self.op.hypervisor
4647
    if ht_kind in constants.HTS_REQ_PORT:
4648
      network_port = self.cfg.AllocatePort()
4649
    else:
4650
      network_port = None
4651

    
4652
    ##if self.op.vnc_bind_address is None:
4653
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4654

    
4655
    # this is needed because os.path.join does not accept None arguments
4656
    if self.op.file_storage_dir is None:
4657
      string_file_storage_dir = ""
4658
    else:
4659
      string_file_storage_dir = self.op.file_storage_dir
4660

    
4661
    # build the full file storage dir path
4662
    file_storage_dir = os.path.normpath(os.path.join(
4663
                                        self.cfg.GetFileStorageDir(),
4664
                                        string_file_storage_dir, instance))
4665

    
4666

    
4667
    disks = _GenerateDiskTemplate(self,
4668
                                  self.op.disk_template,
4669
                                  instance, pnode_name,
4670
                                  self.secondaries,
4671
                                  self.disks,
4672
                                  file_storage_dir,
4673
                                  self.op.file_driver,
4674
                                  0)
4675

    
4676
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4677
                            primary_node=pnode_name,
4678
                            nics=self.nics, disks=disks,
4679
                            disk_template=self.op.disk_template,
4680
                            admin_up=False,
4681
                            network_port=network_port,
4682
                            beparams=self.op.beparams,
4683
                            hvparams=self.op.hvparams,
4684
                            hypervisor=self.op.hypervisor,
4685
                            )
4686

    
4687
    feedback_fn("* creating instance disks...")
4688
    try:
4689
      _CreateDisks(self, iobj)
4690
    except errors.OpExecError:
4691
      self.LogWarning("Device creation failed, reverting...")
4692
      try:
4693
        _RemoveDisks(self, iobj)
4694
      finally:
4695
        self.cfg.ReleaseDRBDMinors(instance)
4696
        raise
4697

    
4698
    feedback_fn("adding instance %s to cluster config" % instance)
4699

    
4700
    self.cfg.AddInstance(iobj)
4701
    # Declare that we don't want to remove the instance lock anymore, as we've
4702
    # added the instance to the config
4703
    del self.remove_locks[locking.LEVEL_INSTANCE]
4704
    # Unlock all the nodes
4705
    if self.op.mode == constants.INSTANCE_IMPORT:
4706
      nodes_keep = [self.op.src_node]
4707
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4708
                       if node != self.op.src_node]
4709
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4710
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4711
    else:
4712
      self.context.glm.release(locking.LEVEL_NODE)
4713
      del self.acquired_locks[locking.LEVEL_NODE]
4714

    
4715
    if self.op.wait_for_sync:
4716
      disk_abort = not _WaitForSync(self, iobj)
4717
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4718
      # make sure the disks are not degraded (still sync-ing is ok)
4719
      time.sleep(15)
4720
      feedback_fn("* checking mirrors status")
4721
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4722
    else:
4723
      disk_abort = False
4724

    
4725
    if disk_abort:
4726
      _RemoveDisks(self, iobj)
4727
      self.cfg.RemoveInstance(iobj.name)
4728
      # Make sure the instance lock gets removed
4729
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4730
      raise errors.OpExecError("There are some degraded disks for"
4731
                               " this instance")
4732

    
4733
    feedback_fn("creating os for instance %s on node %s" %
4734
                (instance, pnode_name))
4735

    
4736
    if iobj.disk_template != constants.DT_DISKLESS:
4737
      if self.op.mode == constants.INSTANCE_CREATE:
4738
        feedback_fn("* running the instance OS create scripts...")
4739
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4740
        msg = result.RemoteFailMsg()
4741
        if msg:
4742
          raise errors.OpExecError("Could not add os for instance %s"
4743
                                   " on node %s: %s" %
4744
                                   (instance, pnode_name, msg))
4745

    
4746
      elif self.op.mode == constants.INSTANCE_IMPORT:
4747
        feedback_fn("* running the instance OS import scripts...")
4748
        src_node = self.op.src_node
4749
        src_images = self.src_images
4750
        cluster_name = self.cfg.GetClusterName()
4751
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4752
                                                         src_node, src_images,
4753
                                                         cluster_name)
4754
        import_result.Raise()
4755
        for idx, result in enumerate(import_result.data):
4756
          if not result:
4757
            self.LogWarning("Could not import the image %s for instance"
4758
                            " %s, disk %d, on node %s" %
4759
                            (src_images[idx], instance, idx, pnode_name))
4760
      else:
4761
        # also checked in the prereq part
4762
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4763
                                     % self.op.mode)
4764

    
4765
    if self.op.start:
4766
      iobj.admin_up = True
4767
      self.cfg.Update(iobj)
4768
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4769
      feedback_fn("* starting instance...")
4770
      result = self.rpc.call_instance_start(pnode_name, iobj)
4771
      msg = result.RemoteFailMsg()
4772
      if msg:
4773
        raise errors.OpExecError("Could not start instance: %s" % msg)
4774

    
4775

    
4776
class LUConnectConsole(NoHooksLU):
4777
  """Connect to an instance's console.
4778

4779
  This is somewhat special in that it returns the command line that
4780
  you need to run on the master node in order to connect to the
4781
  console.
4782

4783
  """
4784
  _OP_REQP = ["instance_name"]
4785
  REQ_BGL = False
4786

    
4787
  def ExpandNames(self):
4788
    self._ExpandAndLockInstance()
4789

    
4790
  def CheckPrereq(self):
4791
    """Check prerequisites.
4792

4793
    This checks that the instance is in the cluster.
4794

4795
    """
4796
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4797
    assert self.instance is not None, \
4798
      "Cannot retrieve locked instance %s" % self.op.instance_name
4799
    _CheckNodeOnline(self, self.instance.primary_node)
4800

    
4801
  def Exec(self, feedback_fn):
4802
    """Connect to the console of an instance
4803

4804
    """
4805
    instance = self.instance
4806
    node = instance.primary_node
4807

    
4808
    node_insts = self.rpc.call_instance_list([node],
4809
                                             [instance.hypervisor])[node]
4810
    node_insts.Raise()
4811

    
4812
    if instance.name not in node_insts.data:
4813
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4814

    
4815
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4816

    
4817
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4818
    cluster = self.cfg.GetClusterInfo()
4819
    # beparams and hvparams are passed separately, to avoid editing the
4820
    # instance and then saving the defaults in the instance itself.
4821
    hvparams = cluster.FillHV(instance)
4822
    beparams = cluster.FillBE(instance)
4823
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4824

    
4825
    # build ssh cmdline
4826
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4827

    
4828

    
4829
class LUReplaceDisks(LogicalUnit):
4830
  """Replace the disks of an instance.
4831

4832
  """
4833
  HPATH = "mirrors-replace"
4834
  HTYPE = constants.HTYPE_INSTANCE
4835
  _OP_REQP = ["instance_name", "mode", "disks"]
4836
  REQ_BGL = False
4837

    
4838
  def CheckArguments(self):
4839
    if not hasattr(self.op, "remote_node"):
4840
      self.op.remote_node = None
4841
    if not hasattr(self.op, "iallocator"):
4842
      self.op.iallocator = None
4843

    
4844
    # check for valid parameter combination
4845
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4846
    if self.op.mode == constants.REPLACE_DISK_CHG:
4847
      if cnt == 2:
4848
        raise errors.OpPrereqError("When changing the secondary either an"
4849
                                   " iallocator script must be used or the"
4850
                                   " new node given")
4851
      elif cnt == 0:
4852
        raise errors.OpPrereqError("Give either the iallocator or the new"
4853
                                   " secondary, not both")
4854
    else: # not replacing the secondary
4855
      if cnt != 2:
4856
        raise errors.OpPrereqError("The iallocator and new node options can"
4857
                                   " be used only when changing the"
4858
                                   " secondary node")
4859

    
4860
  def ExpandNames(self):
4861
    self._ExpandAndLockInstance()
4862

    
4863
    if self.op.iallocator is not None:
4864
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4865
    elif self.op.remote_node is not None:
4866
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4867
      if remote_node is None:
4868
        raise errors.OpPrereqError("Node '%s' not known" %
4869
                                   self.op.remote_node)
4870
      self.op.remote_node = remote_node
4871
      # Warning: do not remove the locking of the new secondary here
4872
      # unless DRBD8.AddChildren is changed to work in parallel;
4873
      # currently it doesn't since parallel invocations of
4874
      # FindUnusedMinor will conflict
4875
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4876
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4877
    else:
4878
      self.needed_locks[locking.LEVEL_NODE] = []
4879
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4880

    
4881
  def DeclareLocks(self, level):
4882
    # If we're not already locking all nodes in the set we have to declare the
4883
    # instance's primary/secondary nodes.
4884
    if (level == locking.LEVEL_NODE and
4885
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4886
      self._LockInstancesNodes()
4887

    
4888
  def _RunAllocator(self):
4889
    """Compute a new secondary node using an IAllocator.
4890

4891
    """
4892
    ial = IAllocator(self,
4893
                     mode=constants.IALLOCATOR_MODE_RELOC,
4894
                     name=self.op.instance_name,
4895
                     relocate_from=[self.sec_node])
4896

    
4897
    ial.Run(self.op.iallocator)
4898

    
4899
    if not ial.success:
4900
      raise errors.OpPrereqError("Can't compute nodes using"
4901
                                 " iallocator '%s': %s" % (self.op.iallocator,
4902
                                                           ial.info))
4903
    if len(ial.nodes) != ial.required_nodes:
4904
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4905
                                 " of nodes (%s), required %s" %
4906
                                 (len(ial.nodes), ial.required_nodes))
4907
    self.op.remote_node = ial.nodes[0]
4908
    self.LogInfo("Selected new secondary for the instance: %s",
4909
                 self.op.remote_node)
4910

    
4911
  def BuildHooksEnv(self):
4912
    """Build hooks env.
4913

4914
    This runs on the master, the primary and all the secondaries.
4915

4916
    """
4917
    env = {
4918
      "MODE": self.op.mode,
4919
      "NEW_SECONDARY": self.op.remote_node,
4920
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4921
      }
4922
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4923
    nl = [
4924
      self.cfg.GetMasterNode(),
4925
      self.instance.primary_node,
4926
      ]
4927
    if self.op.remote_node is not None:
4928
      nl.append(self.op.remote_node)
4929
    return env, nl, nl
4930

    
4931
  def CheckPrereq(self):
4932
    """Check prerequisites.
4933

4934
    This checks that the instance is in the cluster.
4935

4936
    """
4937
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4938
    assert instance is not None, \
4939
      "Cannot retrieve locked instance %s" % self.op.instance_name
4940
    self.instance = instance
4941

    
4942
    if instance.disk_template != constants.DT_DRBD8:
4943
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4944
                                 " instances")
4945

    
4946
    if len(instance.secondary_nodes) != 1:
4947
      raise errors.OpPrereqError("The instance has a strange layout,"
4948
                                 " expected one secondary but found %d" %
4949
                                 len(instance.secondary_nodes))
4950

    
4951
    self.sec_node = instance.secondary_nodes[0]
4952

    
4953
    if self.op.iallocator is not None:
4954
      self._RunAllocator()
4955

    
4956
    remote_node = self.op.remote_node
4957
    if remote_node is not None:
4958
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4959
      assert self.remote_node_info is not None, \
4960
        "Cannot retrieve locked node %s" % remote_node
4961
    else:
4962
      self.remote_node_info = None
4963
    if remote_node == instance.primary_node:
4964
      raise errors.OpPrereqError("The specified node is the primary node of"
4965
                                 " the instance.")
4966
    elif remote_node == self.sec_node:
4967
      raise errors.OpPrereqError("The specified node is already the"
4968
                                 " secondary node of the instance.")
4969

    
4970
    if self.op.mode == constants.REPLACE_DISK_PRI:
4971
      n1 = self.tgt_node = instance.primary_node
4972
      n2 = self.oth_node = self.sec_node
4973
    elif self.op.mode == constants.REPLACE_DISK_SEC:
4974
      n1 = self.tgt_node = self.sec_node
4975
      n2 = self.oth_node = instance.primary_node
4976
    elif self.op.mode == constants.REPLACE_DISK_CHG:
4977
      n1 = self.new_node = remote_node
4978
      n2 = self.oth_node = instance.primary_node
4979
      self.tgt_node = self.sec_node
4980
      _CheckNodeNotDrained(self, remote_node)
4981
    else:
4982
      raise errors.ProgrammerError("Unhandled disk replace mode")
4983

    
4984
    _CheckNodeOnline(self, n1)
4985
    _CheckNodeOnline(self, n2)
4986

    
4987
    if not self.op.disks:
4988
      self.op.disks = range(len(instance.disks))
4989

    
4990
    for disk_idx in self.op.disks:
4991
      instance.FindDisk(disk_idx)
4992

    
4993
  def _ExecD8DiskOnly(self, feedback_fn):
4994
    """Replace a disk on the primary or secondary for dbrd8.
4995

4996
    The algorithm for replace is quite complicated:
4997

4998
      1. for each disk to be replaced:
4999

5000
        1. create new LVs on the target node with unique names
5001
        1. detach old LVs from the drbd device
5002
        1. rename old LVs to name_replaced.<time_t>
5003
        1. rename new LVs to old LVs
5004
        1. attach the new LVs (with the old names now) to the drbd device
5005

5006
      1. wait for sync across all devices
5007

5008
      1. for each modified disk:
5009

5010
        1. remove old LVs (which have the name name_replaces.<time_t>)
5011

5012
    Failures are not very well handled.
5013

5014
    """
5015
    steps_total = 6
5016
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5017
    instance = self.instance
5018
    iv_names = {}
5019
    vgname = self.cfg.GetVGName()
5020
    # start of work
5021
    cfg = self.cfg
5022
    tgt_node = self.tgt_node
5023
    oth_node = self.oth_node
5024

    
5025
    # Step: check device activation
5026
    self.proc.LogStep(1, steps_total, "check device existence")
5027
    info("checking volume groups")
5028
    my_vg = cfg.GetVGName()
5029
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5030
    if not results:
5031
      raise errors.OpExecError("Can't list volume groups on the nodes")
5032
    for node in oth_node, tgt_node:
5033
      res = results[node]
5034
      if res.failed or not res.data or my_vg not in res.data:
5035
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5036
                                 (my_vg, node))
5037
    for idx, dev in enumerate(instance.disks):
5038
      if idx not in self.op.disks:
5039
        continue
5040
      for node in tgt_node, oth_node:
5041
        info("checking disk/%d on %s" % (idx, node))
5042
        cfg.SetDiskID(dev, node)
5043
        result = self.rpc.call_blockdev_find(node, dev)
5044
        msg = result.RemoteFailMsg()
5045
        if not msg and not result.payload:
5046
          msg = "disk not found"
5047
        if msg:
5048
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5049
                                   (idx, node, msg))
5050

    
5051
    # Step: check other node consistency
5052
    self.proc.LogStep(2, steps_total, "check peer consistency")
5053
    for idx, dev in enumerate(instance.disks):
5054
      if idx not in self.op.disks:
5055
        continue
5056
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5057
      if not _CheckDiskConsistency(self, dev, oth_node,
5058
                                   oth_node==instance.primary_node):
5059
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5060
                                 " to replace disks on this node (%s)" %
5061
                                 (oth_node, tgt_node))
5062

    
5063
    # Step: create new storage
5064
    self.proc.LogStep(3, steps_total, "allocate new storage")
5065
    for idx, dev in enumerate(instance.disks):
5066
      if idx not in self.op.disks:
5067
        continue
5068
      size = dev.size
5069
      cfg.SetDiskID(dev, tgt_node)
5070
      lv_names = [".disk%d_%s" % (idx, suf)
5071
                  for suf in ["data", "meta"]]
5072
      names = _GenerateUniqueNames(self, lv_names)
5073
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5074
                             logical_id=(vgname, names[0]))
5075
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5076
                             logical_id=(vgname, names[1]))
5077
      new_lvs = [lv_data, lv_meta]
5078
      old_lvs = dev.children
5079
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5080
      info("creating new local storage on %s for %s" %
5081
           (tgt_node, dev.iv_name))
5082
      # we pass force_create=True to force the LVM creation
5083
      for new_lv in new_lvs:
5084
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5085
                        _GetInstanceInfoText(instance), False)
5086

    
5087
    # Step: for each lv, detach+rename*2+attach
5088
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5089
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5090
      info("detaching %s drbd from local storage" % dev.iv_name)
5091
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5092
      result.Raise()
5093
      if not result.data:
5094
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5095
                                 " %s for device %s" % (tgt_node, dev.iv_name))
5096
      #dev.children = []
5097
      #cfg.Update(instance)
5098

    
5099
      # ok, we created the new LVs, so now we know we have the needed
5100
      # storage; as such, we proceed on the target node to rename
5101
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5102
      # using the assumption that logical_id == physical_id (which in
5103
      # turn is the unique_id on that node)
5104

    
5105
      # FIXME(iustin): use a better name for the replaced LVs
5106
      temp_suffix = int(time.time())
5107
      ren_fn = lambda d, suff: (d.physical_id[0],
5108
                                d.physical_id[1] + "_replaced-%s" % suff)
5109
      # build the rename list based on what LVs exist on the node
5110
      rlist = []
5111
      for to_ren in old_lvs:
5112
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5113
        if not result.RemoteFailMsg() and result.payload:
5114
          # device exists
5115
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5116

    
5117
      info("renaming the old LVs on the target node")
5118
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5119
      result.Raise()
5120
      if not result.data:
5121
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5122
      # now we rename the new LVs to the old LVs
5123
      info("renaming the new LVs on the target node")
5124
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5125
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5126
      result.Raise()
5127
      if not result.data:
5128
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5129

    
5130
      for old, new in zip(old_lvs, new_lvs):
5131
        new.logical_id = old.logical_id
5132
        cfg.SetDiskID(new, tgt_node)
5133

    
5134
      for disk in old_lvs:
5135
        disk.logical_id = ren_fn(disk, temp_suffix)
5136
        cfg.SetDiskID(disk, tgt_node)
5137

    
5138
      # now that the new lvs have the old name, we can add them to the device
5139
      info("adding new mirror component on %s" % tgt_node)
5140
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5141
      if result.failed or not result.data:
5142
        for new_lv in new_lvs:
5143
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5144
          if msg:
5145
            warning("Can't rollback device %s: %s", dev, msg,
5146
                    hint="cleanup manually the unused logical volumes")
5147
        raise errors.OpExecError("Can't add local storage to drbd")
5148

    
5149
      dev.children = new_lvs
5150
      cfg.Update(instance)
5151

    
5152
    # Step: wait for sync
5153

    
5154
    # this can fail as the old devices are degraded and _WaitForSync
5155
    # does a combined result over all disks, so we don't check its
5156
    # return value
5157
    self.proc.LogStep(5, steps_total, "sync devices")
5158
    _WaitForSync(self, instance, unlock=True)
5159

    
5160
    # so check manually all the devices
5161
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5162
      cfg.SetDiskID(dev, instance.primary_node)
5163
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5164
      msg = result.RemoteFailMsg()
5165
      if not msg and not result.payload:
5166
        msg = "disk not found"
5167
      if msg:
5168
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5169
                                 (name, msg))
5170
      if result.payload[5]:
5171
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5172

    
5173
    # Step: remove old storage
5174
    self.proc.LogStep(6, steps_total, "removing old storage")
5175
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5176
      info("remove logical volumes for %s" % name)
5177
      for lv in old_lvs:
5178
        cfg.SetDiskID(lv, tgt_node)
5179
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5180
        if msg:
5181
          warning("Can't remove old LV: %s" % msg,
5182
                  hint="manually remove unused LVs")
5183
          continue
5184

    
5185
  def _ExecD8Secondary(self, feedback_fn):
5186
    """Replace the secondary node for drbd8.
5187

5188
    The algorithm for replace is quite complicated:
5189
      - for all disks of the instance:
5190
        - create new LVs on the new node with same names
5191
        - shutdown the drbd device on the old secondary
5192
        - disconnect the drbd network on the primary
5193
        - create the drbd device on the new secondary
5194
        - network attach the drbd on the primary, using an artifice:
5195
          the drbd code for Attach() will connect to the network if it
5196
          finds a device which is connected to the good local disks but
5197
          not network enabled
5198
      - wait for sync across all devices
5199
      - remove all disks from the old secondary
5200

5201
    Failures are not very well handled.
5202

5203
    """
5204
    steps_total = 6
5205
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5206
    instance = self.instance
5207
    iv_names = {}
5208
    # start of work
5209
    cfg = self.cfg
5210
    old_node = self.tgt_node
5211
    new_node = self.new_node
5212
    pri_node = instance.primary_node
5213
    nodes_ip = {
5214
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5215
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5216
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5217
      }
5218

    
5219
    # Step: check device activation
5220
    self.proc.LogStep(1, steps_total, "check device existence")
5221
    info("checking volume groups")
5222
    my_vg = cfg.GetVGName()
5223
    results = self.rpc.call_vg_list([pri_node, new_node])
5224
    for node in pri_node, new_node:
5225
      res = results[node]
5226
      if res.failed or not res.data or my_vg not in res.data:
5227
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5228
                                 (my_vg, node))
5229
    for idx, dev in enumerate(instance.disks):
5230
      if idx not in self.op.disks:
5231
        continue
5232
      info("checking disk/%d on %s" % (idx, pri_node))
5233
      cfg.SetDiskID(dev, pri_node)
5234
      result = self.rpc.call_blockdev_find(pri_node, dev)
5235
      msg = result.RemoteFailMsg()
5236
      if not msg and not result.payload:
5237
        msg = "disk not found"
5238
      if msg:
5239
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5240
                                 (idx, pri_node, msg))
5241

    
5242
    # Step: check other node consistency
5243
    self.proc.LogStep(2, steps_total, "check peer consistency")
5244
    for idx, dev in enumerate(instance.disks):
5245
      if idx not in self.op.disks:
5246
        continue
5247
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5248
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5249
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5250
                                 " unsafe to replace the secondary" %
5251
                                 pri_node)
5252

    
5253
    # Step: create new storage
5254
    self.proc.LogStep(3, steps_total, "allocate new storage")
5255
    for idx, dev in enumerate(instance.disks):
5256
      info("adding new local storage on %s for disk/%d" %
5257
           (new_node, idx))
5258
      # we pass force_create=True to force LVM creation
5259
      for new_lv in dev.children:
5260
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5261
                        _GetInstanceInfoText(instance), False)
5262

    
5263
    # Step 4: dbrd minors and drbd setups changes
5264
    # after this, we must manually remove the drbd minors on both the
5265
    # error and the success paths
5266
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5267
                                   instance.name)
5268
    logging.debug("Allocated minors %s" % (minors,))
5269
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5270
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5271
      size = dev.size
5272
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5273
      # create new devices on new_node; note that we create two IDs:
5274
      # one without port, so the drbd will be activated without
5275
      # networking information on the new node at this stage, and one
5276
      # with network, for the latter activation in step 4
5277
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5278
      if pri_node == o_node1:
5279
        p_minor = o_minor1
5280
      else:
5281
        p_minor = o_minor2
5282

    
5283
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5284
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5285

    
5286
      iv_names[idx] = (dev, dev.children, new_net_id)
5287
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5288
                    new_net_id)
5289
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5290
                              logical_id=new_alone_id,
5291
                              children=dev.children)
5292
      try:
5293
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5294
                              _GetInstanceInfoText(instance), False)
5295
      except errors.BlockDeviceError:
5296
        self.cfg.ReleaseDRBDMinors(instance.name)
5297
        raise
5298

    
5299
    for idx, dev in enumerate(instance.disks):
5300
      # we have new devices, shutdown the drbd on the old secondary
5301
      info("shutting down drbd for disk/%d on old node" % idx)
5302
      cfg.SetDiskID(dev, old_node)
5303
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5304
      if msg:
5305
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5306
                (idx, msg),
5307
                hint="Please cleanup this device manually as soon as possible")
5308

    
5309
    info("detaching primary drbds from the network (=> standalone)")
5310
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5311
                                               instance.disks)[pri_node]
5312

    
5313
    msg = result.RemoteFailMsg()
5314
    if msg:
5315
      # detaches didn't succeed (unlikely)
5316
      self.cfg.ReleaseDRBDMinors(instance.name)
5317
      raise errors.OpExecError("Can't detach the disks from the network on"
5318
                               " old node: %s" % (msg,))
5319

    
5320
    # if we managed to detach at least one, we update all the disks of
5321
    # the instance to point to the new secondary
5322
    info("updating instance configuration")
5323
    for dev, _, new_logical_id in iv_names.itervalues():
5324
      dev.logical_id = new_logical_id
5325
      cfg.SetDiskID(dev, pri_node)
5326
    cfg.Update(instance)
5327

    
5328
    # and now perform the drbd attach
5329
    info("attaching primary drbds to new secondary (standalone => connected)")
5330
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5331
                                           instance.disks, instance.name,
5332
                                           False)
5333
    for to_node, to_result in result.items():
5334
      msg = to_result.RemoteFailMsg()
5335
      if msg:
5336
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5337
                hint="please do a gnt-instance info to see the"
5338
                " status of disks")
5339

    
5340
    # this can fail as the old devices are degraded and _WaitForSync
5341
    # does a combined result over all disks, so we don't check its
5342
    # return value
5343
    self.proc.LogStep(5, steps_total, "sync devices")
5344
    _WaitForSync(self, instance, unlock=True)
5345

    
5346
    # so check manually all the devices
5347
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5348
      cfg.SetDiskID(dev, pri_node)
5349
      result = self.rpc.call_blockdev_find(pri_node, dev)
5350
      msg = result.RemoteFailMsg()
5351
      if not msg and not result.payload:
5352
        msg = "disk not found"
5353
      if msg:
5354
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5355
                                 (idx, msg))
5356
      if result.payload[5]:
5357
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5358

    
5359
    self.proc.LogStep(6, steps_total, "removing old storage")
5360
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5361
      info("remove logical volumes for disk/%d" % idx)
5362
      for lv in old_lvs:
5363
        cfg.SetDiskID(lv, old_node)
5364
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5365
        if msg:
5366
          warning("Can't remove LV on old secondary: %s", msg,
5367
                  hint="Cleanup stale volumes by hand")
5368

    
5369
  def Exec(self, feedback_fn):
5370
    """Execute disk replacement.
5371

5372
    This dispatches the disk replacement to the appropriate handler.
5373

5374
    """
5375
    instance = self.instance
5376

    
5377
    # Activate the instance disks if we're replacing them on a down instance
5378
    if not instance.admin_up:
5379
      _StartInstanceDisks(self, instance, True)
5380

    
5381
    if self.op.mode == constants.REPLACE_DISK_CHG:
5382
      fn = self._ExecD8Secondary
5383
    else:
5384
      fn = self._ExecD8DiskOnly
5385

    
5386
    ret = fn(feedback_fn)
5387

    
5388
    # Deactivate the instance disks if we're replacing them on a down instance
5389
    if not instance.admin_up:
5390
      _SafeShutdownInstanceDisks(self, instance)
5391

    
5392
    return ret
5393

    
5394

    
5395
class LUGrowDisk(LogicalUnit):
5396
  """Grow a disk of an instance.
5397

5398
  """
5399
  HPATH = "disk-grow"
5400
  HTYPE = constants.HTYPE_INSTANCE
5401
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5402
  REQ_BGL = False
5403

    
5404
  def ExpandNames(self):
5405
    self._ExpandAndLockInstance()
5406
    self.needed_locks[locking.LEVEL_NODE] = []
5407
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5408

    
5409
  def DeclareLocks(self, level):
5410
    if level == locking.LEVEL_NODE:
5411
      self._LockInstancesNodes()
5412

    
5413
  def BuildHooksEnv(self):
5414
    """Build hooks env.
5415

5416
    This runs on the master, the primary and all the secondaries.
5417

5418
    """
5419
    env = {
5420
      "DISK": self.op.disk,
5421
      "AMOUNT": self.op.amount,
5422
      }
5423
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5424
    nl = [
5425
      self.cfg.GetMasterNode(),
5426
      self.instance.primary_node,
5427
      ]
5428
    return env, nl, nl
5429

    
5430
  def CheckPrereq(self):
5431
    """Check prerequisites.
5432

5433
    This checks that the instance is in the cluster.
5434

5435
    """
5436
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5437
    assert instance is not None, \
5438
      "Cannot retrieve locked instance %s" % self.op.instance_name
5439
    nodenames = list(instance.all_nodes)
5440
    for node in nodenames:
5441
      _CheckNodeOnline(self, node)
5442

    
5443

    
5444
    self.instance = instance
5445

    
5446
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5447
      raise errors.OpPrereqError("Instance's disk layout does not support"
5448
                                 " growing.")
5449

    
5450
    self.disk = instance.FindDisk(self.op.disk)
5451

    
5452
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5453
                                       instance.hypervisor)
5454
    for node in nodenames:
5455
      info = nodeinfo[node]
5456
      if info.failed or not info.data:
5457
        raise errors.OpPrereqError("Cannot get current information"
5458
                                   " from node '%s'" % node)
5459
      vg_free = info.data.get('vg_free', None)
5460
      if not isinstance(vg_free, int):
5461
        raise errors.OpPrereqError("Can't compute free disk space on"
5462
                                   " node %s" % node)
5463
      if self.op.amount > vg_free:
5464
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5465
                                   " %d MiB available, %d MiB required" %
5466
                                   (node, vg_free, self.op.amount))
5467

    
5468
  def Exec(self, feedback_fn):
5469
    """Execute disk grow.
5470

5471
    """
5472
    instance = self.instance
5473
    disk = self.disk
5474
    for node in instance.all_nodes:
5475
      self.cfg.SetDiskID(disk, node)
5476
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5477
      msg = result.RemoteFailMsg()
5478
      if msg:
5479
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5480
                                 (node, msg))
5481
    disk.RecordGrow(self.op.amount)
5482
    self.cfg.Update(instance)
5483
    if self.op.wait_for_sync:
5484
      disk_abort = not _WaitForSync(self, instance)
5485
      if disk_abort:
5486
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5487
                             " status.\nPlease check the instance.")
5488

    
5489

    
5490
class LUQueryInstanceData(NoHooksLU):
5491
  """Query runtime instance data.
5492

5493
  """
5494
  _OP_REQP = ["instances", "static"]
5495
  REQ_BGL = False
5496

    
5497
  def ExpandNames(self):
5498
    self.needed_locks = {}
5499
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5500

    
5501
    if not isinstance(self.op.instances, list):
5502
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5503

    
5504
    if self.op.instances:
5505
      self.wanted_names = []
5506
      for name in self.op.instances:
5507
        full_name = self.cfg.ExpandInstanceName(name)
5508
        if full_name is None:
5509
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5510
        self.wanted_names.append(full_name)
5511
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5512
    else:
5513
      self.wanted_names = None
5514
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5515

    
5516
    self.needed_locks[locking.LEVEL_NODE] = []
5517
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5518

    
5519
  def DeclareLocks(self, level):
5520
    if level == locking.LEVEL_NODE:
5521
      self._LockInstancesNodes()
5522

    
5523
  def CheckPrereq(self):
5524
    """Check prerequisites.
5525

5526
    This only checks the optional instance list against the existing names.
5527

5528
    """
5529
    if self.wanted_names is None:
5530
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5531

    
5532
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5533
                             in self.wanted_names]
5534
    return
5535

    
5536
  def _ComputeDiskStatus(self, instance, snode, dev):
5537
    """Compute block device status.
5538

5539
    """
5540
    static = self.op.static
5541
    if not static:
5542
      self.cfg.SetDiskID(dev, instance.primary_node)
5543
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5544
      if dev_pstatus.offline:
5545
        dev_pstatus = None
5546
      else:
5547
        msg = dev_pstatus.RemoteFailMsg()
5548
        if msg:
5549
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5550
                                   (instance.name, msg))
5551
        dev_pstatus = dev_pstatus.payload
5552
    else:
5553
      dev_pstatus = None
5554

    
5555
    if dev.dev_type in constants.LDS_DRBD:
5556
      # we change the snode then (otherwise we use the one passed in)
5557
      if dev.logical_id[0] == instance.primary_node:
5558
        snode = dev.logical_id[1]
5559
      else:
5560
        snode = dev.logical_id[0]
5561

    
5562
    if snode and not static:
5563
      self.cfg.SetDiskID(dev, snode)
5564
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5565
      if dev_sstatus.offline:
5566
        dev_sstatus = None
5567
      else:
5568
        msg = dev_sstatus.RemoteFailMsg()
5569
        if msg:
5570
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5571
                                   (instance.name, msg))
5572
        dev_sstatus = dev_sstatus.payload
5573
    else:
5574
      dev_sstatus = None
5575

    
5576
    if dev.children:
5577
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5578
                      for child in dev.children]
5579
    else:
5580
      dev_children = []
5581

    
5582
    data = {
5583
      "iv_name": dev.iv_name,
5584
      "dev_type": dev.dev_type,
5585
      "logical_id": dev.logical_id,
5586
      "physical_id": dev.physical_id,
5587
      "pstatus": dev_pstatus,
5588
      "sstatus": dev_sstatus,
5589
      "children": dev_children,
5590
      "mode": dev.mode,
5591
      }
5592

    
5593
    return data
5594

    
5595
  def Exec(self, feedback_fn):
5596
    """Gather and return data"""
5597
    result = {}
5598

    
5599
    cluster = self.cfg.GetClusterInfo()
5600

    
5601
    for instance in self.wanted_instances:
5602
      if not self.op.static:
5603
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5604
                                                  instance.name,
5605
                                                  instance.hypervisor)
5606
        remote_info.Raise()
5607
        remote_info = remote_info.data
5608
        if remote_info and "state" in remote_info:
5609
          remote_state = "up"
5610
        else:
5611
          remote_state = "down"
5612
      else:
5613
        remote_state = None
5614
      if instance.admin_up:
5615
        config_state = "up"
5616
      else:
5617
        config_state = "down"
5618

    
5619
      disks = [self._ComputeDiskStatus(instance, None, device)
5620
               for device in instance.disks]
5621

    
5622
      idict = {
5623
        "name": instance.name,
5624
        "config_state": config_state,
5625
        "run_state": remote_state,
5626
        "pnode": instance.primary_node,
5627
        "snodes": instance.secondary_nodes,
5628
        "os": instance.os,
5629
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5630
        "disks": disks,
5631
        "hypervisor": instance.hypervisor,
5632
        "network_port": instance.network_port,
5633
        "hv_instance": instance.hvparams,
5634
        "hv_actual": cluster.FillHV(instance),
5635
        "be_instance": instance.beparams,
5636
        "be_actual": cluster.FillBE(instance),
5637
        }
5638

    
5639
      result[instance.name] = idict
5640

    
5641
    return result
5642

    
5643

    
5644
class LUSetInstanceParams(LogicalUnit):
5645
  """Modifies an instances's parameters.
5646

5647
  """
5648
  HPATH = "instance-modify"
5649
  HTYPE = constants.HTYPE_INSTANCE
5650
  _OP_REQP = ["instance_name"]
5651
  REQ_BGL = False
5652

    
5653
  def CheckArguments(self):
5654
    if not hasattr(self.op, 'nics'):
5655
      self.op.nics = []
5656
    if not hasattr(self.op, 'disks'):
5657
      self.op.disks = []
5658
    if not hasattr(self.op, 'beparams'):
5659
      self.op.beparams = {}
5660
    if not hasattr(self.op, 'hvparams'):
5661
      self.op.hvparams = {}
5662
    self.op.force = getattr(self.op, "force", False)
5663
    if not (self.op.nics or self.op.disks or
5664
            self.op.hvparams or self.op.beparams):
5665
      raise errors.OpPrereqError("No changes submitted")
5666

    
5667
    # Disk validation
5668
    disk_addremove = 0
5669
    for disk_op, disk_dict in self.op.disks:
5670
      if disk_op == constants.DDM_REMOVE:
5671
        disk_addremove += 1
5672
        continue
5673
      elif disk_op == constants.DDM_ADD:
5674
        disk_addremove += 1
5675
      else:
5676
        if not isinstance(disk_op, int):
5677
          raise errors.OpPrereqError("Invalid disk index")
5678
      if disk_op == constants.DDM_ADD:
5679
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5680
        if mode not in constants.DISK_ACCESS_SET:
5681
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5682
        size = disk_dict.get('size', None)
5683
        if size is None:
5684
          raise errors.OpPrereqError("Required disk parameter size missing")
5685
        try:
5686
          size = int(size)
5687
        except ValueError, err:
5688
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5689
                                     str(err))
5690
        disk_dict['size'] = size
5691
      else:
5692
        # modification of disk
5693
        if 'size' in disk_dict:
5694
          raise errors.OpPrereqError("Disk size change not possible, use"
5695
                                     " grow-disk")
5696

    
5697
    if disk_addremove > 1:
5698
      raise errors.OpPrereqError("Only one disk add or remove operation"
5699
                                 " supported at a time")
5700

    
5701
    # NIC validation
5702
    nic_addremove = 0
5703
    for nic_op, nic_dict in self.op.nics:
5704
      if nic_op == constants.DDM_REMOVE:
5705
        nic_addremove += 1
5706
        continue
5707
      elif nic_op == constants.DDM_ADD:
5708
        nic_addremove += 1
5709
      else:
5710
        if not isinstance(nic_op, int):
5711
          raise errors.OpPrereqError("Invalid nic index")
5712

    
5713
      # nic_dict should be a dict
5714
      nic_ip = nic_dict.get('ip', None)
5715
      if nic_ip is not None:
5716
        if nic_ip.lower() == constants.VALUE_NONE:
5717
          nic_dict['ip'] = None
5718
        else:
5719
          if not utils.IsValidIP(nic_ip):
5720
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5721

    
5722
      if nic_op == constants.DDM_ADD:
5723
        nic_bridge = nic_dict.get('bridge', None)
5724
        if nic_bridge is None:
5725
          nic_dict['bridge'] = self.cfg.GetDefBridge()
5726
        nic_mac = nic_dict.get('mac', None)
5727
        if nic_mac is None:
5728
          nic_dict['mac'] = constants.VALUE_AUTO
5729

    
5730
      if 'mac' in nic_dict:
5731
        nic_mac = nic_dict['mac']
5732
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5733
          if not utils.IsValidMac(nic_mac):
5734
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5735
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5736
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5737
                                     " modifying an existing nic")
5738

    
5739
    if nic_addremove > 1:
5740
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5741
                                 " supported at a time")
5742

    
5743
  def ExpandNames(self):
5744
    self._ExpandAndLockInstance()
5745
    self.needed_locks[locking.LEVEL_NODE] = []
5746
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5747

    
5748
  def DeclareLocks(self, level):
5749
    if level == locking.LEVEL_NODE:
5750
      self._LockInstancesNodes()
5751

    
5752
  def BuildHooksEnv(self):
5753
    """Build hooks env.
5754

5755
    This runs on the master, primary and secondaries.
5756

5757
    """
5758
    args = dict()
5759
    if constants.BE_MEMORY in self.be_new:
5760
      args['memory'] = self.be_new[constants.BE_MEMORY]
5761
    if constants.BE_VCPUS in self.be_new:
5762
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5763
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5764
    # information at all.
5765
    if self.op.nics:
5766
      args['nics'] = []
5767
      nic_override = dict(self.op.nics)
5768
      for idx, nic in enumerate(self.instance.nics):
5769
        if idx in nic_override:
5770
          this_nic_override = nic_override[idx]
5771
        else:
5772
          this_nic_override = {}
5773
        if 'ip' in this_nic_override:
5774
          ip = this_nic_override['ip']
5775
        else:
5776
          ip = nic.ip
5777
        if 'bridge' in this_nic_override:
5778
          bridge = this_nic_override['bridge']
5779
        else:
5780
          bridge = nic.bridge
5781
        if 'mac' in this_nic_override:
5782
          mac = this_nic_override['mac']
5783
        else:
5784
          mac = nic.mac
5785
        args['nics'].append((ip, bridge, mac))
5786
      if constants.DDM_ADD in nic_override:
5787
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5788
        bridge = nic_override[constants.DDM_ADD]['bridge']
5789
        mac = nic_override[constants.DDM_ADD]['mac']
5790
        args['nics'].append((ip, bridge, mac))
5791
      elif constants.DDM_REMOVE in nic_override:
5792
        del args['nics'][-1]
5793

    
5794
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5795
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5796
    return env, nl, nl
5797

    
5798
  def CheckPrereq(self):
5799
    """Check prerequisites.
5800

5801
    This only checks the instance list against the existing names.
5802

5803
    """
5804
    force = self.force = self.op.force
5805

    
5806
    # checking the new params on the primary/secondary nodes
5807

    
5808
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5809
    assert self.instance is not None, \
5810
      "Cannot retrieve locked instance %s" % self.op.instance_name
5811
    pnode = instance.primary_node
5812
    nodelist = list(instance.all_nodes)
5813

    
5814
    # hvparams processing
5815
    if self.op.hvparams:
5816
      i_hvdict = copy.deepcopy(instance.hvparams)
5817
      for key, val in self.op.hvparams.iteritems():
5818
        if val == constants.VALUE_DEFAULT:
5819
          try:
5820
            del i_hvdict[key]
5821
          except KeyError:
5822
            pass
5823
        else:
5824
          i_hvdict[key] = val
5825
      cluster = self.cfg.GetClusterInfo()
5826
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5827
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5828
                                i_hvdict)
5829
      # local check
5830
      hypervisor.GetHypervisor(
5831
        instance.hypervisor).CheckParameterSyntax(hv_new)
5832
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5833
      self.hv_new = hv_new # the new actual values
5834
      self.hv_inst = i_hvdict # the new dict (without defaults)
5835
    else:
5836
      self.hv_new = self.hv_inst = {}
5837

    
5838
    # beparams processing
5839
    if self.op.beparams:
5840
      i_bedict = copy.deepcopy(instance.beparams)
5841
      for key, val in self.op.beparams.iteritems():
5842
        if val == constants.VALUE_DEFAULT:
5843
          try:
5844
            del i_bedict[key]
5845
          except KeyError:
5846
            pass
5847
        else:
5848
          i_bedict[key] = val
5849
      cluster = self.cfg.GetClusterInfo()
5850
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5851
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5852
                                i_bedict)
5853
      self.be_new = be_new # the new actual values
5854
      self.be_inst = i_bedict # the new dict (without defaults)
5855
    else:
5856
      self.be_new = self.be_inst = {}
5857

    
5858
    self.warn = []
5859

    
5860
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5861
      mem_check_list = [pnode]
5862
      if be_new[constants.BE_AUTO_BALANCE]:
5863
        # either we changed auto_balance to yes or it was from before
5864
        mem_check_list.extend(instance.secondary_nodes)
5865
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5866
                                                  instance.hypervisor)
5867
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5868
                                         instance.hypervisor)
5869
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5870
        # Assume the primary node is unreachable and go ahead
5871
        self.warn.append("Can't get info from primary node %s" % pnode)
5872
      else:
5873
        if not instance_info.failed and instance_info.data:
5874
          current_mem = instance_info.data['memory']
5875
        else:
5876
          # Assume instance not running
5877
          # (there is a slight race condition here, but it's not very probable,
5878
          # and we have no other way to check)
5879
          current_mem = 0
5880
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5881
                    nodeinfo[pnode].data['memory_free'])
5882
        if miss_mem > 0:
5883
          raise errors.OpPrereqError("This change will prevent the instance"
5884
                                     " from starting, due to %d MB of memory"
5885
                                     " missing on its primary node" % miss_mem)
5886

    
5887
      if be_new[constants.BE_AUTO_BALANCE]:
5888
        for node, nres in nodeinfo.iteritems():
5889
          if node not in instance.secondary_nodes:
5890
            continue
5891
          if nres.failed or not isinstance(nres.data, dict):
5892
            self.warn.append("Can't get info from secondary node %s" % node)
5893
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5894
            self.warn.append("Not enough memory to failover instance to"
5895
                             " secondary node %s" % node)
5896

    
5897
    # NIC processing
5898
    for nic_op, nic_dict in self.op.nics:
5899
      if nic_op == constants.DDM_REMOVE:
5900
        if not instance.nics:
5901
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5902
        continue
5903
      if nic_op != constants.DDM_ADD:
5904
        # an existing nic
5905
        if nic_op < 0 or nic_op >= len(instance.nics):
5906
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5907
                                     " are 0 to %d" %
5908
                                     (nic_op, len(instance.nics)))
5909
      if 'bridge' in nic_dict:
5910
        nic_bridge = nic_dict['bridge']
5911
        if nic_bridge is None:
5912
          raise errors.OpPrereqError('Cannot set the nic bridge to None')
5913
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5914
          msg = ("Bridge '%s' doesn't exist on one of"
5915
                 " the instance nodes" % nic_bridge)
5916
          if self.force:
5917
            self.warn.append(msg)
5918
          else:
5919
            raise errors.OpPrereqError(msg)
5920
      if 'mac' in nic_dict:
5921
        nic_mac = nic_dict['mac']
5922
        if nic_mac is None:
5923
          raise errors.OpPrereqError('Cannot set the nic mac to None')
5924
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5925
          # otherwise generate the mac
5926
          nic_dict['mac'] = self.cfg.GenerateMAC()
5927
        else:
5928
          # or validate/reserve the current one
5929
          if self.cfg.IsMacInUse(nic_mac):
5930
            raise errors.OpPrereqError("MAC address %s already in use"
5931
                                       " in cluster" % nic_mac)
5932

    
5933
    # DISK processing
5934
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5935
      raise errors.OpPrereqError("Disk operations not supported for"
5936
                                 " diskless instances")
5937
    for disk_op, disk_dict in self.op.disks:
5938
      if disk_op == constants.DDM_REMOVE:
5939
        if len(instance.disks) == 1:
5940
          raise errors.OpPrereqError("Cannot remove the last disk of"
5941
                                     " an instance")
5942
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5943
        ins_l = ins_l[pnode]
5944
        if ins_l.failed or not isinstance(ins_l.data, list):
5945
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5946
        if instance.name in ins_l.data:
5947
          raise errors.OpPrereqError("Instance is running, can't remove"
5948
                                     " disks.")
5949

    
5950
      if (disk_op == constants.DDM_ADD and
5951
          len(instance.nics) >= constants.MAX_DISKS):
5952
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5953
                                   " add more" % constants.MAX_DISKS)
5954
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5955
        # an existing disk
5956
        if disk_op < 0 or disk_op >= len(instance.disks):
5957
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5958
                                     " are 0 to %d" %
5959
                                     (disk_op, len(instance.disks)))
5960

    
5961
    return
5962

    
5963
  def Exec(self, feedback_fn):
5964
    """Modifies an instance.
5965

5966
    All parameters take effect only at the next restart of the instance.
5967

5968
    """
5969
    # Process here the warnings from CheckPrereq, as we don't have a
5970
    # feedback_fn there.
5971
    for warn in self.warn:
5972
      feedback_fn("WARNING: %s" % warn)
5973

    
5974
    result = []
5975
    instance = self.instance
5976
    # disk changes
5977
    for disk_op, disk_dict in self.op.disks:
5978
      if disk_op == constants.DDM_REMOVE:
5979
        # remove the last disk
5980
        device = instance.disks.pop()
5981
        device_idx = len(instance.disks)
5982
        for node, disk in device.ComputeNodeTree(instance.primary_node):
5983
          self.cfg.SetDiskID(disk, node)
5984
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
5985
          if msg:
5986
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
5987
                            " continuing anyway", device_idx, node, msg)
5988
        result.append(("disk/%d" % device_idx, "remove"))
5989
      elif disk_op == constants.DDM_ADD:
5990
        # add a new disk
5991
        if instance.disk_template == constants.DT_FILE:
5992
          file_driver, file_path = instance.disks[0].logical_id
5993
          file_path = os.path.dirname(file_path)
5994
        else:
5995
          file_driver = file_path = None
5996
        disk_idx_base = len(instance.disks)
5997
        new_disk = _GenerateDiskTemplate(self,
5998
                                         instance.disk_template,
5999
                                         instance.name, instance.primary_node,
6000
                                         instance.secondary_nodes,
6001
                                         [disk_dict],
6002
                                         file_path,
6003
                                         file_driver,
6004
                                         disk_idx_base)[0]
6005
        instance.disks.append(new_disk)
6006
        info = _GetInstanceInfoText(instance)
6007

    
6008
        logging.info("Creating volume %s for instance %s",
6009
                     new_disk.iv_name, instance.name)
6010
        # Note: this needs to be kept in sync with _CreateDisks
6011
        #HARDCODE
6012
        for node in instance.all_nodes:
6013
          f_create = node == instance.primary_node
6014
          try:
6015
            _CreateBlockDev(self, node, instance, new_disk,
6016
                            f_create, info, f_create)
6017
          except errors.OpExecError, err:
6018
            self.LogWarning("Failed to create volume %s (%s) on"
6019
                            " node %s: %s",
6020
                            new_disk.iv_name, new_disk, node, err)
6021
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6022
                       (new_disk.size, new_disk.mode)))
6023
      else:
6024
        # change a given disk
6025
        instance.disks[disk_op].mode = disk_dict['mode']
6026
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6027
    # NIC changes
6028
    for nic_op, nic_dict in self.op.nics:
6029
      if nic_op == constants.DDM_REMOVE:
6030
        # remove the last nic
6031
        del instance.nics[-1]
6032
        result.append(("nic.%d" % len(instance.nics), "remove"))
6033
      elif nic_op == constants.DDM_ADD:
6034
        # mac and bridge should be set, by now
6035
        mac = nic_dict['mac']
6036
        bridge = nic_dict['bridge']
6037
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6038
                              bridge=bridge)
6039
        instance.nics.append(new_nic)
6040
        result.append(("nic.%d" % (len(instance.nics) - 1),
6041
                       "add:mac=%s,ip=%s,bridge=%s" %
6042
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
6043
      else:
6044
        # change a given nic
6045
        for key in 'mac', 'ip', 'bridge':
6046
          if key in nic_dict:
6047
            setattr(instance.nics[nic_op], key, nic_dict[key])
6048
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6049

    
6050
    # hvparams changes
6051
    if self.op.hvparams:
6052
      instance.hvparams = self.hv_inst
6053
      for key, val in self.op.hvparams.iteritems():
6054
        result.append(("hv/%s" % key, val))
6055

    
6056
    # beparams changes
6057
    if self.op.beparams:
6058
      instance.beparams = self.be_inst
6059
      for key, val in self.op.beparams.iteritems():
6060
        result.append(("be/%s" % key, val))
6061

    
6062
    self.cfg.Update(instance)
6063

    
6064
    return result
6065

    
6066

    
6067
class LUQueryExports(NoHooksLU):
6068
  """Query the exports list
6069

6070
  """
6071
  _OP_REQP = ['nodes']
6072
  REQ_BGL = False
6073

    
6074
  def ExpandNames(self):
6075
    self.needed_locks = {}
6076
    self.share_locks[locking.LEVEL_NODE] = 1
6077
    if not self.op.nodes:
6078
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6079
    else:
6080
      self.needed_locks[locking.LEVEL_NODE] = \
6081
        _GetWantedNodes(self, self.op.nodes)
6082

    
6083
  def CheckPrereq(self):
6084
    """Check prerequisites.
6085

6086
    """
6087
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6088

    
6089
  def Exec(self, feedback_fn):
6090
    """Compute the list of all the exported system images.
6091

6092
    @rtype: dict
6093
    @return: a dictionary with the structure node->(export-list)
6094
        where export-list is a list of the instances exported on
6095
        that node.
6096

6097
    """
6098
    rpcresult = self.rpc.call_export_list(self.nodes)
6099
    result = {}
6100
    for node in rpcresult:
6101
      if rpcresult[node].failed:
6102
        result[node] = False
6103
      else:
6104
        result[node] = rpcresult[node].data
6105

    
6106
    return result
6107

    
6108

    
6109
class LUExportInstance(LogicalUnit):
6110
  """Export an instance to an image in the cluster.
6111

6112
  """
6113
  HPATH = "instance-export"
6114
  HTYPE = constants.HTYPE_INSTANCE
6115
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6116
  REQ_BGL = False
6117

    
6118
  def ExpandNames(self):
6119
    self._ExpandAndLockInstance()
6120
    # FIXME: lock only instance primary and destination node
6121
    #
6122
    # Sad but true, for now we have do lock all nodes, as we don't know where
6123
    # the previous export might be, and and in this LU we search for it and
6124
    # remove it from its current node. In the future we could fix this by:
6125
    #  - making a tasklet to search (share-lock all), then create the new one,
6126
    #    then one to remove, after
6127
    #  - removing the removal operation altoghether
6128
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6129

    
6130
  def DeclareLocks(self, level):
6131
    """Last minute lock declaration."""
6132
    # All nodes are locked anyway, so nothing to do here.
6133

    
6134
  def BuildHooksEnv(self):
6135
    """Build hooks env.
6136

6137
    This will run on the master, primary node and target node.
6138

6139
    """
6140
    env = {
6141
      "EXPORT_NODE": self.op.target_node,
6142
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6143
      }
6144
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6145
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6146
          self.op.target_node]
6147
    return env, nl, nl
6148

    
6149
  def CheckPrereq(self):
6150
    """Check prerequisites.
6151

6152
    This checks that the instance and node names are valid.
6153

6154
    """
6155
    instance_name = self.op.instance_name
6156
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6157
    assert self.instance is not None, \
6158
          "Cannot retrieve locked instance %s" % self.op.instance_name
6159
    _CheckNodeOnline(self, self.instance.primary_node)
6160

    
6161
    self.dst_node = self.cfg.GetNodeInfo(
6162
      self.cfg.ExpandNodeName(self.op.target_node))
6163

    
6164
    if self.dst_node is None:
6165
      # This is wrong node name, not a non-locked node
6166
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6167
    _CheckNodeOnline(self, self.dst_node.name)
6168
    _CheckNodeNotDrained(self, self.dst_node.name)
6169

    
6170
    # instance disk type verification
6171
    for disk in self.instance.disks:
6172
      if disk.dev_type == constants.LD_FILE:
6173
        raise errors.OpPrereqError("Export not supported for instances with"
6174
                                   " file-based disks")
6175

    
6176
  def Exec(self, feedback_fn):
6177
    """Export an instance to an image in the cluster.
6178

6179
    """
6180
    instance = self.instance
6181
    dst_node = self.dst_node
6182
    src_node = instance.primary_node
6183
    if self.op.shutdown:
6184
      # shutdown the instance, but not the disks
6185
      result = self.rpc.call_instance_shutdown(src_node, instance)
6186
      msg = result.RemoteFailMsg()
6187
      if msg:
6188
        raise errors.OpExecError("Could not shutdown instance %s on"
6189
                                 " node %s: %s" %
6190
                                 (instance.name, src_node, msg))
6191

    
6192
    vgname = self.cfg.GetVGName()
6193

    
6194
    snap_disks = []
6195

    
6196
    # set the disks ID correctly since call_instance_start needs the
6197
    # correct drbd minor to create the symlinks
6198
    for disk in instance.disks:
6199
      self.cfg.SetDiskID(disk, src_node)
6200

    
6201
    try:
6202
      for disk in instance.disks:
6203
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6204
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6205
        if new_dev_name.failed or not new_dev_name.data:
6206
          self.LogWarning("Could not snapshot block device %s on node %s",
6207
                          disk.logical_id[1], src_node)
6208
          snap_disks.append(False)
6209
        else:
6210
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6211
                                 logical_id=(vgname, new_dev_name.data),
6212
                                 physical_id=(vgname, new_dev_name.data),
6213
                                 iv_name=disk.iv_name)
6214
          snap_disks.append(new_dev)
6215

    
6216
    finally:
6217
      if self.op.shutdown and instance.admin_up:
6218
        result = self.rpc.call_instance_start(src_node, instance)
6219
        msg = result.RemoteFailMsg()
6220
        if msg:
6221
          _ShutdownInstanceDisks(self, instance)
6222
          raise errors.OpExecError("Could not start instance: %s" % msg)
6223

    
6224
    # TODO: check for size
6225

    
6226
    cluster_name = self.cfg.GetClusterName()
6227
    for idx, dev in enumerate(snap_disks):
6228
      if dev:
6229
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6230
                                               instance, cluster_name, idx)
6231
        if result.failed or not result.data:
6232
          self.LogWarning("Could not export block device %s from node %s to"
6233
                          " node %s", dev.logical_id[1], src_node,
6234
                          dst_node.name)
6235
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6236
        if msg:
6237
          self.LogWarning("Could not remove snapshot block device %s from node"
6238
                          " %s: %s", dev.logical_id[1], src_node, msg)
6239

    
6240
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6241
    if result.failed or not result.data:
6242
      self.LogWarning("Could not finalize export for instance %s on node %s",
6243
                      instance.name, dst_node.name)
6244

    
6245
    nodelist = self.cfg.GetNodeList()
6246
    nodelist.remove(dst_node.name)
6247

    
6248
    # on one-node clusters nodelist will be empty after the removal
6249
    # if we proceed the backup would be removed because OpQueryExports
6250
    # substitutes an empty list with the full cluster node list.
6251
    if nodelist:
6252
      exportlist = self.rpc.call_export_list(nodelist)
6253
      for node in exportlist:
6254
        if exportlist[node].failed:
6255
          continue
6256
        if instance.name in exportlist[node].data:
6257
          if not self.rpc.call_export_remove(node, instance.name):
6258
            self.LogWarning("Could not remove older export for instance %s"
6259
                            " on node %s", instance.name, node)
6260

    
6261

    
6262
class LURemoveExport(NoHooksLU):
6263
  """Remove exports related to the named instance.
6264

6265
  """
6266
  _OP_REQP = ["instance_name"]
6267
  REQ_BGL = False
6268

    
6269
  def ExpandNames(self):
6270
    self.needed_locks = {}
6271
    # We need all nodes to be locked in order for RemoveExport to work, but we
6272
    # don't need to lock the instance itself, as nothing will happen to it (and
6273
    # we can remove exports also for a removed instance)
6274
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6275

    
6276
  def CheckPrereq(self):
6277
    """Check prerequisites.
6278
    """
6279
    pass
6280

    
6281
  def Exec(self, feedback_fn):
6282
    """Remove any export.
6283

6284
    """
6285
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6286
    # If the instance was not found we'll try with the name that was passed in.
6287
    # This will only work if it was an FQDN, though.
6288
    fqdn_warn = False
6289
    if not instance_name:
6290
      fqdn_warn = True
6291
      instance_name = self.op.instance_name
6292

    
6293
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6294
      locking.LEVEL_NODE])
6295
    found = False
6296
    for node in exportlist:
6297
      if exportlist[node].failed:
6298
        self.LogWarning("Failed to query node %s, continuing" % node)
6299
        continue
6300
      if instance_name in exportlist[node].data:
6301
        found = True
6302
        result = self.rpc.call_export_remove(node, instance_name)
6303
        if result.failed or not result.data:
6304
          logging.error("Could not remove export for instance %s"
6305
                        " on node %s", instance_name, node)
6306

    
6307
    if fqdn_warn and not found:
6308
      feedback_fn("Export not found. If trying to remove an export belonging"
6309
                  " to a deleted instance please use its Fully Qualified"
6310
                  " Domain Name.")
6311

    
6312

    
6313
class TagsLU(NoHooksLU):
6314
  """Generic tags LU.
6315

6316
  This is an abstract class which is the parent of all the other tags LUs.
6317

6318
  """
6319

    
6320
  def ExpandNames(self):
6321
    self.needed_locks = {}
6322
    if self.op.kind == constants.TAG_NODE:
6323
      name = self.cfg.ExpandNodeName(self.op.name)
6324
      if name is None:
6325
        raise errors.OpPrereqError("Invalid node name (%s)" %
6326
                                   (self.op.name,))
6327
      self.op.name = name
6328
      self.needed_locks[locking.LEVEL_NODE] = name
6329
    elif self.op.kind == constants.TAG_INSTANCE:
6330
      name = self.cfg.ExpandInstanceName(self.op.name)
6331
      if name is None:
6332
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6333
                                   (self.op.name,))
6334
      self.op.name = name
6335
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6336

    
6337
  def CheckPrereq(self):
6338
    """Check prerequisites.
6339

6340
    """
6341
    if self.op.kind == constants.TAG_CLUSTER:
6342
      self.target = self.cfg.GetClusterInfo()
6343
    elif self.op.kind == constants.TAG_NODE:
6344
      self.target = self.cfg.GetNodeInfo(self.op.name)
6345
    elif self.op.kind == constants.TAG_INSTANCE:
6346
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6347
    else:
6348
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6349
                                 str(self.op.kind))
6350

    
6351

    
6352
class LUGetTags(TagsLU):
6353
  """Returns the tags of a given object.
6354

6355
  """
6356
  _OP_REQP = ["kind", "name"]
6357
  REQ_BGL = False
6358

    
6359
  def Exec(self, feedback_fn):
6360
    """Returns the tag list.
6361

6362
    """
6363
    return list(self.target.GetTags())
6364

    
6365

    
6366
class LUSearchTags(NoHooksLU):
6367
  """Searches the tags for a given pattern.
6368

6369
  """
6370
  _OP_REQP = ["pattern"]
6371
  REQ_BGL = False
6372

    
6373
  def ExpandNames(self):
6374
    self.needed_locks = {}
6375

    
6376
  def CheckPrereq(self):
6377
    """Check prerequisites.
6378

6379
    This checks the pattern passed for validity by compiling it.
6380

6381
    """
6382
    try:
6383
      self.re = re.compile(self.op.pattern)
6384
    except re.error, err:
6385
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6386
                                 (self.op.pattern, err))
6387

    
6388
  def Exec(self, feedback_fn):
6389
    """Returns the tag list.
6390

6391
    """
6392
    cfg = self.cfg
6393
    tgts = [("/cluster", cfg.GetClusterInfo())]
6394
    ilist = cfg.GetAllInstancesInfo().values()
6395
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6396
    nlist = cfg.GetAllNodesInfo().values()
6397
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6398
    results = []
6399
    for path, target in tgts:
6400
      for tag in target.GetTags():
6401
        if self.re.search(tag):
6402
          results.append((path, tag))
6403
    return results
6404

    
6405

    
6406
class LUAddTags(TagsLU):
6407
  """Sets a tag on a given object.
6408

6409
  """
6410
  _OP_REQP = ["kind", "name", "tags"]
6411
  REQ_BGL = False
6412

    
6413
  def CheckPrereq(self):
6414
    """Check prerequisites.
6415

6416
    This checks the type and length of the tag name and value.
6417

6418
    """
6419
    TagsLU.CheckPrereq(self)
6420
    for tag in self.op.tags:
6421
      objects.TaggableObject.ValidateTag(tag)
6422

    
6423
  def Exec(self, feedback_fn):
6424
    """Sets the tag.
6425

6426
    """
6427
    try:
6428
      for tag in self.op.tags:
6429
        self.target.AddTag(tag)
6430
    except errors.TagError, err:
6431
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6432
    try:
6433
      self.cfg.Update(self.target)
6434
    except errors.ConfigurationError:
6435
      raise errors.OpRetryError("There has been a modification to the"
6436
                                " config file and the operation has been"
6437
                                " aborted. Please retry.")
6438

    
6439

    
6440
class LUDelTags(TagsLU):
6441
  """Delete a list of tags from a given object.
6442

6443
  """
6444
  _OP_REQP = ["kind", "name", "tags"]
6445
  REQ_BGL = False
6446

    
6447
  def CheckPrereq(self):
6448
    """Check prerequisites.
6449

6450
    This checks that we have the given tag.
6451

6452
    """
6453
    TagsLU.CheckPrereq(self)
6454
    for tag in self.op.tags:
6455
      objects.TaggableObject.ValidateTag(tag)
6456
    del_tags = frozenset(self.op.tags)
6457
    cur_tags = self.target.GetTags()
6458
    if not del_tags <= cur_tags:
6459
      diff_tags = del_tags - cur_tags
6460
      diff_names = ["'%s'" % tag for tag in diff_tags]
6461
      diff_names.sort()
6462
      raise errors.OpPrereqError("Tag(s) %s not found" %
6463
                                 (",".join(diff_names)))
6464

    
6465
  def Exec(self, feedback_fn):
6466
    """Remove the tag from the object.
6467

6468
    """
6469
    for tag in self.op.tags:
6470
      self.target.RemoveTag(tag)
6471
    try:
6472
      self.cfg.Update(self.target)
6473
    except errors.ConfigurationError:
6474
      raise errors.OpRetryError("There has been a modification to the"
6475
                                " config file and the operation has been"
6476
                                " aborted. Please retry.")
6477

    
6478

    
6479
class LUTestDelay(NoHooksLU):
6480
  """Sleep for a specified amount of time.
6481

6482
  This LU sleeps on the master and/or nodes for a specified amount of
6483
  time.
6484

6485
  """
6486
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6487
  REQ_BGL = False
6488

    
6489
  def ExpandNames(self):
6490
    """Expand names and set required locks.
6491

6492
    This expands the node list, if any.
6493

6494
    """
6495
    self.needed_locks = {}
6496
    if self.op.on_nodes:
6497
      # _GetWantedNodes can be used here, but is not always appropriate to use
6498
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6499
      # more information.
6500
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6501
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6502

    
6503
  def CheckPrereq(self):
6504
    """Check prerequisites.
6505

6506
    """
6507

    
6508
  def Exec(self, feedback_fn):
6509
    """Do the actual sleep.
6510

6511
    """
6512
    if self.op.on_master:
6513
      if not utils.TestDelay(self.op.duration):
6514
        raise errors.OpExecError("Error during master delay test")
6515
    if self.op.on_nodes:
6516
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6517
      if not result:
6518
        raise errors.OpExecError("Complete failure from rpc call")
6519
      for node, node_result in result.items():
6520
        node_result.Raise()
6521
        if not node_result.data:
6522
          raise errors.OpExecError("Failure during rpc call to node %s,"
6523
                                   " result: %s" % (node, node_result.data))
6524

    
6525

    
6526
class IAllocator(object):
6527
  """IAllocator framework.
6528

6529
  An IAllocator instance has three sets of attributes:
6530
    - cfg that is needed to query the cluster
6531
    - input data (all members of the _KEYS class attribute are required)
6532
    - four buffer attributes (in|out_data|text), that represent the
6533
      input (to the external script) in text and data structure format,
6534
      and the output from it, again in two formats
6535
    - the result variables from the script (success, info, nodes) for
6536
      easy usage
6537

6538
  """
6539
  _ALLO_KEYS = [
6540
    "mem_size", "disks", "disk_template",
6541
    "os", "tags", "nics", "vcpus", "hypervisor",
6542
    ]
6543
  _RELO_KEYS = [
6544
    "relocate_from",
6545
    ]
6546

    
6547
  def __init__(self, lu, mode, name, **kwargs):
6548
    self.lu = lu
6549
    # init buffer variables
6550
    self.in_text = self.out_text = self.in_data = self.out_data = None
6551
    # init all input fields so that pylint is happy
6552
    self.mode = mode
6553
    self.name = name
6554
    self.mem_size = self.disks = self.disk_template = None
6555
    self.os = self.tags = self.nics = self.vcpus = None
6556
    self.hypervisor = None
6557
    self.relocate_from = None
6558
    # computed fields
6559
    self.required_nodes = None
6560
    # init result fields
6561
    self.success = self.info = self.nodes = None
6562
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6563
      keyset = self._ALLO_KEYS
6564
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6565
      keyset = self._RELO_KEYS
6566
    else:
6567
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6568
                                   " IAllocator" % self.mode)
6569
    for key in kwargs:
6570
      if key not in keyset:
6571
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6572
                                     " IAllocator" % key)
6573
      setattr(self, key, kwargs[key])
6574
    for key in keyset:
6575
      if key not in kwargs:
6576
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6577
                                     " IAllocator" % key)
6578
    self._BuildInputData()
6579

    
6580
  def _ComputeClusterData(self):
6581
    """Compute the generic allocator input data.
6582

6583
    This is the data that is independent of the actual operation.
6584

6585
    """
6586
    cfg = self.lu.cfg
6587
    cluster_info = cfg.GetClusterInfo()
6588
    # cluster data
6589
    data = {
6590
      "version": constants.IALLOCATOR_VERSION,
6591
      "cluster_name": cfg.GetClusterName(),
6592
      "cluster_tags": list(cluster_info.GetTags()),
6593
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6594
      # we don't have job IDs
6595
      }
6596
    iinfo = cfg.GetAllInstancesInfo().values()
6597
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6598

    
6599
    # node data
6600
    node_results = {}
6601
    node_list = cfg.GetNodeList()
6602

    
6603
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6604
      hypervisor_name = self.hypervisor
6605
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6606
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6607

    
6608
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6609
                                           hypervisor_name)
6610
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6611
                       cluster_info.enabled_hypervisors)
6612
    for nname, nresult in node_data.items():
6613
      # first fill in static (config-based) values
6614
      ninfo = cfg.GetNodeInfo(nname)
6615
      pnr = {
6616
        "tags": list(ninfo.GetTags()),
6617
        "primary_ip": ninfo.primary_ip,
6618
        "secondary_ip": ninfo.secondary_ip,
6619
        "offline": ninfo.offline,
6620
        "drained": ninfo.drained,
6621
        "master_candidate": ninfo.master_candidate,
6622
        }
6623

    
6624
      if not ninfo.offline:
6625
        nresult.Raise()
6626
        if not isinstance(nresult.data, dict):
6627
          raise errors.OpExecError("Can't get data for node %s" % nname)
6628
        remote_info = nresult.data
6629
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6630
                     'vg_size', 'vg_free', 'cpu_total']:
6631
          if attr not in remote_info:
6632
            raise errors.OpExecError("Node '%s' didn't return attribute"
6633
                                     " '%s'" % (nname, attr))
6634
          try:
6635
            remote_info[attr] = int(remote_info[attr])
6636
          except ValueError, err:
6637
            raise errors.OpExecError("Node '%s' returned invalid value"
6638
                                     " for '%s': %s" % (nname, attr, err))
6639
        # compute memory used by primary instances
6640
        i_p_mem = i_p_up_mem = 0
6641
        for iinfo, beinfo in i_list:
6642
          if iinfo.primary_node == nname:
6643
            i_p_mem += beinfo[constants.BE_MEMORY]
6644
            if iinfo.name not in node_iinfo[nname].data:
6645
              i_used_mem = 0
6646
            else:
6647
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6648
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6649
            remote_info['memory_free'] -= max(0, i_mem_diff)
6650

    
6651
            if iinfo.admin_up:
6652
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6653

    
6654
        # compute memory used by instances
6655
        pnr_dyn = {
6656
          "total_memory": remote_info['memory_total'],
6657
          "reserved_memory": remote_info['memory_dom0'],
6658
          "free_memory": remote_info['memory_free'],
6659
          "total_disk": remote_info['vg_size'],
6660
          "free_disk": remote_info['vg_free'],
6661
          "total_cpus": remote_info['cpu_total'],
6662
          "i_pri_memory": i_p_mem,
6663
          "i_pri_up_memory": i_p_up_mem,
6664
          }
6665
        pnr.update(pnr_dyn)
6666

    
6667
      node_results[nname] = pnr
6668
    data["nodes"] = node_results
6669

    
6670
    # instance data
6671
    instance_data = {}
6672
    for iinfo, beinfo in i_list:
6673
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6674
                  for n in iinfo.nics]
6675
      pir = {
6676
        "tags": list(iinfo.GetTags()),
6677
        "admin_up": iinfo.admin_up,
6678
        "vcpus": beinfo[constants.BE_VCPUS],
6679
        "memory": beinfo[constants.BE_MEMORY],
6680
        "os": iinfo.os,
6681
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6682
        "nics": nic_data,
6683
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6684
        "disk_template": iinfo.disk_template,
6685
        "hypervisor": iinfo.hypervisor,
6686
        }
6687
      instance_data[iinfo.name] = pir
6688

    
6689
    data["instances"] = instance_data
6690

    
6691
    self.in_data = data
6692

    
6693
  def _AddNewInstance(self):
6694
    """Add new instance data to allocator structure.
6695

6696
    This in combination with _AllocatorGetClusterData will create the
6697
    correct structure needed as input for the allocator.
6698

6699
    The checks for the completeness of the opcode must have already been
6700
    done.
6701

6702
    """
6703
    data = self.in_data
6704

    
6705
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6706

    
6707
    if self.disk_template in constants.DTS_NET_MIRROR:
6708
      self.required_nodes = 2
6709
    else:
6710
      self.required_nodes = 1
6711
    request = {
6712
      "type": "allocate",
6713
      "name": self.name,
6714
      "disk_template": self.disk_template,
6715
      "tags": self.tags,
6716
      "os": self.os,
6717
      "vcpus": self.vcpus,
6718
      "memory": self.mem_size,
6719
      "disks": self.disks,
6720
      "disk_space_total": disk_space,
6721
      "nics": self.nics,
6722
      "required_nodes": self.required_nodes,
6723
      }
6724
    data["request"] = request
6725

    
6726
  def _AddRelocateInstance(self):
6727
    """Add relocate instance data to allocator structure.
6728

6729
    This in combination with _IAllocatorGetClusterData will create the
6730
    correct structure needed as input for the allocator.
6731

6732
    The checks for the completeness of the opcode must have already been
6733
    done.
6734

6735
    """
6736
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6737
    if instance is None:
6738
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6739
                                   " IAllocator" % self.name)
6740

    
6741
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6742
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6743

    
6744
    if len(instance.secondary_nodes) != 1:
6745
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6746

    
6747
    self.required_nodes = 1
6748
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6749
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6750

    
6751
    request = {
6752
      "type": "relocate",
6753
      "name": self.name,
6754
      "disk_space_total": disk_space,
6755
      "required_nodes": self.required_nodes,
6756
      "relocate_from": self.relocate_from,
6757
      }
6758
    self.in_data["request"] = request
6759

    
6760
  def _BuildInputData(self):
6761
    """Build input data structures.
6762

6763
    """
6764
    self._ComputeClusterData()
6765

    
6766
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6767
      self._AddNewInstance()
6768
    else:
6769
      self._AddRelocateInstance()
6770

    
6771
    self.in_text = serializer.Dump(self.in_data)
6772

    
6773
  def Run(self, name, validate=True, call_fn=None):
6774
    """Run an instance allocator and return the results.
6775

6776
    """
6777
    if call_fn is None:
6778
      call_fn = self.lu.rpc.call_iallocator_runner
6779
    data = self.in_text
6780

    
6781
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6782
    result.Raise()
6783

    
6784
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6785
      raise errors.OpExecError("Invalid result from master iallocator runner")
6786

    
6787
    rcode, stdout, stderr, fail = result.data
6788

    
6789
    if rcode == constants.IARUN_NOTFOUND:
6790
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6791
    elif rcode == constants.IARUN_FAILURE:
6792
      raise errors.OpExecError("Instance allocator call failed: %s,"
6793
                               " output: %s" % (fail, stdout+stderr))
6794
    self.out_text = stdout
6795
    if validate:
6796
      self._ValidateResult()
6797

    
6798
  def _ValidateResult(self):
6799
    """Process the allocator results.
6800

6801
    This will process and if successful save the result in
6802
    self.out_data and the other parameters.
6803

6804
    """
6805
    try:
6806
      rdict = serializer.Load(self.out_text)
6807
    except Exception, err:
6808
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6809

    
6810
    if not isinstance(rdict, dict):
6811
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6812

    
6813
    for key in "success", "info", "nodes":
6814
      if key not in rdict:
6815
        raise errors.OpExecError("Can't parse iallocator results:"
6816
                                 " missing key '%s'" % key)
6817
      setattr(self, key, rdict[key])
6818

    
6819
    if not isinstance(rdict["nodes"], list):
6820
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6821
                               " is not a list")
6822
    self.out_data = rdict
6823

    
6824

    
6825
class LUTestAllocator(NoHooksLU):
6826
  """Run allocator tests.
6827

6828
  This LU runs the allocator tests
6829

6830
  """
6831
  _OP_REQP = ["direction", "mode", "name"]
6832

    
6833
  def CheckPrereq(self):
6834
    """Check prerequisites.
6835

6836
    This checks the opcode parameters depending on the director and mode test.
6837

6838
    """
6839
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6840
      for attr in ["name", "mem_size", "disks", "disk_template",
6841
                   "os", "tags", "nics", "vcpus"]:
6842
        if not hasattr(self.op, attr):
6843
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6844
                                     attr)
6845
      iname = self.cfg.ExpandInstanceName(self.op.name)
6846
      if iname is not None:
6847
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6848
                                   iname)
6849
      if not isinstance(self.op.nics, list):
6850
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6851
      for row in self.op.nics:
6852
        if (not isinstance(row, dict) or
6853
            "mac" not in row or
6854
            "ip" not in row or
6855
            "bridge" not in row):
6856
          raise errors.OpPrereqError("Invalid contents of the"
6857
                                     " 'nics' parameter")
6858
      if not isinstance(self.op.disks, list):
6859
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6860
      for row in self.op.disks:
6861
        if (not isinstance(row, dict) or
6862
            "size" not in row or
6863
            not isinstance(row["size"], int) or
6864
            "mode" not in row or
6865
            row["mode"] not in ['r', 'w']):
6866
          raise errors.OpPrereqError("Invalid contents of the"
6867
                                     " 'disks' parameter")
6868
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6869
        self.op.hypervisor = self.cfg.GetHypervisorType()
6870
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6871
      if not hasattr(self.op, "name"):
6872
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6873
      fname = self.cfg.ExpandInstanceName(self.op.name)
6874
      if fname is None:
6875
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6876
                                   self.op.name)
6877
      self.op.name = fname
6878
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6879
    else:
6880
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6881
                                 self.op.mode)
6882

    
6883
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6884
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6885
        raise errors.OpPrereqError("Missing allocator name")
6886
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6887
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6888
                                 self.op.direction)
6889

    
6890
  def Exec(self, feedback_fn):
6891
    """Run the allocator test.
6892

6893
    """
6894
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6895
      ial = IAllocator(self,
6896
                       mode=self.op.mode,
6897
                       name=self.op.name,
6898
                       mem_size=self.op.mem_size,
6899
                       disks=self.op.disks,
6900
                       disk_template=self.op.disk_template,
6901
                       os=self.op.os,
6902
                       tags=self.op.tags,
6903
                       nics=self.op.nics,
6904
                       vcpus=self.op.vcpus,
6905
                       hypervisor=self.op.hypervisor,
6906
                       )
6907
    else:
6908
      ial = IAllocator(self,
6909
                       mode=self.op.mode,
6910
                       name=self.op.name,
6911
                       relocate_from=list(self.relocate_from),
6912
                       )
6913

    
6914
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6915
      result = ial.in_text
6916
    else:
6917
      ial.Run(self.op.allocator, validate=False)
6918
      result = ial.out_text
6919
    return result