Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ bd45767b

History | View | Annotate | Download (241.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import tempfile
30
import re
31
import platform
32
import logging
33
import copy
34
import random
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395
  return wanted
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _CheckBooleanOpField(op, name):
418
  """Validates boolean opcode parameters.
419

420
  This will ensure that an opcode parameter is either a boolean value,
421
  or None (but that it always exists).
422

423
  """
424
  val = getattr(op, name, None)
425
  if not (val is None or isinstance(val, bool)):
426
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427
                               (name, str(val)))
428
  setattr(op, name, val)
429

    
430

    
431
def _CheckNodeOnline(lu, node):
432
  """Ensure that a given node is online.
433

434
  @param lu: the LU on behalf of which we make the check
435
  @param node: the node to check
436
  @raise errors.OpPrereqError: if the node is offline
437

438
  """
439
  if lu.cfg.GetNodeInfo(node).offline:
440
    raise errors.OpPrereqError("Can't use offline node %s" % node)
441

    
442

    
443
def _CheckNodeNotDrained(lu, node):
444
  """Ensure that a given node is not drained.
445

446
  @param lu: the LU on behalf of which we make the check
447
  @param node: the node to check
448
  @raise errors.OpPrereqError: if the node is drained
449

450
  """
451
  if lu.cfg.GetNodeInfo(node).drained:
452
    raise errors.OpPrereqError("Can't use drained node %s" % node)
453

    
454

    
455
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456
                          memory, vcpus, nics, disk_template, disks):
457
  """Builds instance related env variables for hooks
458

459
  This builds the hook environment from individual variables.
460

461
  @type name: string
462
  @param name: the name of the instance
463
  @type primary_node: string
464
  @param primary_node: the name of the instance's primary node
465
  @type secondary_nodes: list
466
  @param secondary_nodes: list of secondary nodes as strings
467
  @type os_type: string
468
  @param os_type: the name of the instance's OS
469
  @type status: boolean
470
  @param status: the should_run status of the instance
471
  @type memory: string
472
  @param memory: the memory size of the instance
473
  @type vcpus: string
474
  @param vcpus: the count of VCPUs the instance has
475
  @type nics: list
476
  @param nics: list of tuples (ip, bridge, mac) representing
477
      the NICs the instance  has
478
  @type disk_template: string
479
  @param disk_template: the distk template of the instance
480
  @type disks: list
481
  @param disks: the list of (size, mode) pairs
482
  @rtype: dict
483
  @return: the hook environment for this instance
484

485
  """
486
  if status:
487
    str_status = "up"
488
  else:
489
    str_status = "down"
490
  env = {
491
    "OP_TARGET": name,
492
    "INSTANCE_NAME": name,
493
    "INSTANCE_PRIMARY": primary_node,
494
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495
    "INSTANCE_OS_TYPE": os_type,
496
    "INSTANCE_STATUS": str_status,
497
    "INSTANCE_MEMORY": memory,
498
    "INSTANCE_VCPUS": vcpus,
499
    "INSTANCE_DISK_TEMPLATE": disk_template,
500
  }
501

    
502
  if nics:
503
    nic_count = len(nics)
504
    for idx, (ip, bridge, mac) in enumerate(nics):
505
      if ip is None:
506
        ip = ""
507
      env["INSTANCE_NIC%d_IP" % idx] = ip
508
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509
      env["INSTANCE_NIC%d_MAC" % idx] = mac
510
  else:
511
    nic_count = 0
512

    
513
  env["INSTANCE_NIC_COUNT"] = nic_count
514

    
515
  if disks:
516
    disk_count = len(disks)
517
    for idx, (size, mode) in enumerate(disks):
518
      env["INSTANCE_DISK%d_SIZE" % idx] = size
519
      env["INSTANCE_DISK%d_MODE" % idx] = mode
520
  else:
521
    disk_count = 0
522

    
523
  env["INSTANCE_DISK_COUNT"] = disk_count
524

    
525
  return env
526

    
527

    
528
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529
  """Builds instance related env variables for hooks from an object.
530

531
  @type lu: L{LogicalUnit}
532
  @param lu: the logical unit on whose behalf we execute
533
  @type instance: L{objects.Instance}
534
  @param instance: the instance for which we should build the
535
      environment
536
  @type override: dict
537
  @param override: dictionary with key/values that will override
538
      our values
539
  @rtype: dict
540
  @return: the hook environment dictionary
541

542
  """
543
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
544
  args = {
545
    'name': instance.name,
546
    'primary_node': instance.primary_node,
547
    'secondary_nodes': instance.secondary_nodes,
548
    'os_type': instance.os,
549
    'status': instance.admin_up,
550
    'memory': bep[constants.BE_MEMORY],
551
    'vcpus': bep[constants.BE_VCPUS],
552
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553
    'disk_template': instance.disk_template,
554
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
555
  }
556
  if override:
557
    args.update(override)
558
  return _BuildInstanceHookEnv(**args)
559

    
560

    
561
def _AdjustCandidatePool(lu):
562
  """Adjust the candidate pool after node operations.
563

564
  """
565
  mod_list = lu.cfg.MaintainCandidatePool()
566
  if mod_list:
567
    lu.LogInfo("Promoted nodes to master candidate role: %s",
568
               ", ".join(node.name for node in mod_list))
569
    for name in mod_list:
570
      lu.context.ReaddNode(name)
571
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572
  if mc_now > mc_max:
573
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574
               (mc_now, mc_max))
575

    
576

    
577
def _CheckInstanceBridgesExist(lu, instance):
578
  """Check that the brigdes needed by an instance exist.
579

580
  """
581
  # check bridges existance
582
  brlist = [nic.bridge for nic in instance.nics]
583
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584
  result.Raise()
585
  if not result.data:
586
    raise errors.OpPrereqError("One or more target bridges %s does not"
587
                               " exist on destination node '%s'" %
588
                               (brlist, instance.primary_node))
589

    
590

    
591
class LUDestroyCluster(NoHooksLU):
592
  """Logical unit for destroying the cluster.
593

594
  """
595
  _OP_REQP = []
596

    
597
  def CheckPrereq(self):
598
    """Check prerequisites.
599

600
    This checks whether the cluster is empty.
601

602
    Any errors are signalled by raising errors.OpPrereqError.
603

604
    """
605
    master = self.cfg.GetMasterNode()
606

    
607
    nodelist = self.cfg.GetNodeList()
608
    if len(nodelist) != 1 or nodelist[0] != master:
609
      raise errors.OpPrereqError("There are still %d node(s) in"
610
                                 " this cluster." % (len(nodelist) - 1))
611
    instancelist = self.cfg.GetInstanceList()
612
    if instancelist:
613
      raise errors.OpPrereqError("There are still %d instance(s) in"
614
                                 " this cluster." % len(instancelist))
615

    
616
  def Exec(self, feedback_fn):
617
    """Destroys the cluster.
618

619
    """
620
    master = self.cfg.GetMasterNode()
621
    result = self.rpc.call_node_stop_master(master, False)
622
    result.Raise()
623
    if not result.data:
624
      raise errors.OpExecError("Could not disable the master role")
625
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626
    utils.CreateBackup(priv_key)
627
    utils.CreateBackup(pub_key)
628
    return master
629

    
630

    
631
class LUVerifyCluster(LogicalUnit):
632
  """Verifies the cluster status.
633

634
  """
635
  HPATH = "cluster-verify"
636
  HTYPE = constants.HTYPE_CLUSTER
637
  _OP_REQP = ["skip_checks"]
638
  REQ_BGL = False
639

    
640
  def ExpandNames(self):
641
    self.needed_locks = {
642
      locking.LEVEL_NODE: locking.ALL_SET,
643
      locking.LEVEL_INSTANCE: locking.ALL_SET,
644
    }
645
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646

    
647
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648
                  node_result, feedback_fn, master_files,
649
                  drbd_map, vg_name):
650
    """Run multiple tests against a node.
651

652
    Test list:
653

654
      - compares ganeti version
655
      - checks vg existance and size > 20G
656
      - checks config file checksum
657
      - checks ssh to other nodes
658

659
    @type nodeinfo: L{objects.Node}
660
    @param nodeinfo: the node to check
661
    @param file_list: required list of files
662
    @param local_cksum: dictionary of local files and their checksums
663
    @param node_result: the results from the node
664
    @param feedback_fn: function used to accumulate results
665
    @param master_files: list of files that only masters should have
666
    @param drbd_map: the useddrbd minors for this node, in
667
        form of minor: (instance, must_exist) which correspond to instances
668
        and their running status
669
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670

671
    """
672
    node = nodeinfo.name
673

    
674
    # main result, node_result should be a non-empty dict
675
    if not node_result or not isinstance(node_result, dict):
676
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677
      return True
678

    
679
    # compares ganeti version
680
    local_version = constants.PROTOCOL_VERSION
681
    remote_version = node_result.get('version', None)
682
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
683
            len(remote_version) == 2):
684
      feedback_fn("  - ERROR: connection to %s failed" % (node))
685
      return True
686

    
687
    if local_version != remote_version[0]:
688
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689
                  " node %s %s" % (local_version, node, remote_version[0]))
690
      return True
691

    
692
    # node seems compatible, we can actually try to look into its results
693

    
694
    bad = False
695

    
696
    # full package version
697
    if constants.RELEASE_VERSION != remote_version[1]:
698
      feedback_fn("  - WARNING: software version mismatch: master %s,"
699
                  " node %s %s" %
700
                  (constants.RELEASE_VERSION, node, remote_version[1]))
701

    
702
    # checks vg existence and size > 20G
703
    if vg_name is not None:
704
      vglist = node_result.get(constants.NV_VGLIST, None)
705
      if not vglist:
706
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707
                        (node,))
708
        bad = True
709
      else:
710
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711
                                              constants.MIN_VG_SIZE)
712
        if vgstatus:
713
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714
          bad = True
715

    
716
    # checks config file checksum
717

    
718
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
719
    if not isinstance(remote_cksum, dict):
720
      bad = True
721
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
722
    else:
723
      for file_name in file_list:
724
        node_is_mc = nodeinfo.master_candidate
725
        must_have_file = file_name not in master_files
726
        if file_name not in remote_cksum:
727
          if node_is_mc or must_have_file:
728
            bad = True
729
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
730
        elif remote_cksum[file_name] != local_cksum[file_name]:
731
          if node_is_mc or must_have_file:
732
            bad = True
733
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734
          else:
735
            # not candidate and this is not a must-have file
736
            bad = True
737
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738
                        " '%s'" % file_name)
739
        else:
740
          # all good, except non-master/non-must have combination
741
          if not node_is_mc and not must_have_file:
742
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
743
                        " candidates" % file_name)
744

    
745
    # checks ssh to any
746

    
747
    if constants.NV_NODELIST not in node_result:
748
      bad = True
749
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750
    else:
751
      if node_result[constants.NV_NODELIST]:
752
        bad = True
753
        for node in node_result[constants.NV_NODELIST]:
754
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755
                          (node, node_result[constants.NV_NODELIST][node]))
756

    
757
    if constants.NV_NODENETTEST not in node_result:
758
      bad = True
759
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760
    else:
761
      if node_result[constants.NV_NODENETTEST]:
762
        bad = True
763
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764
        for node in nlist:
765
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766
                          (node, node_result[constants.NV_NODENETTEST][node]))
767

    
768
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769
    if isinstance(hyp_result, dict):
770
      for hv_name, hv_result in hyp_result.iteritems():
771
        if hv_result is not None:
772
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773
                      (hv_name, hv_result))
774

    
775
    # check used drbd list
776
    if vg_name is not None:
777
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
778
      if not isinstance(used_minors, (tuple, list)):
779
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780
                    str(used_minors))
781
      else:
782
        for minor, (iname, must_exist) in drbd_map.items():
783
          if minor not in used_minors and must_exist:
784
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785
                        " not active" % (minor, iname))
786
            bad = True
787
        for minor in used_minors:
788
          if minor not in drbd_map:
789
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790
                        minor)
791
            bad = True
792

    
793
    return bad
794

    
795
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796
                      node_instance, feedback_fn, n_offline):
797
    """Verify an instance.
798

799
    This function checks to see if the required block devices are
800
    available on the instance's node.
801

802
    """
803
    bad = False
804

    
805
    node_current = instanceconfig.primary_node
806

    
807
    node_vol_should = {}
808
    instanceconfig.MapLVsByNode(node_vol_should)
809

    
810
    for node in node_vol_should:
811
      if node in n_offline:
812
        # ignore missing volumes on offline nodes
813
        continue
814
      for volume in node_vol_should[node]:
815
        if node not in node_vol_is or volume not in node_vol_is[node]:
816
          feedback_fn("  - ERROR: volume %s missing on node %s" %
817
                          (volume, node))
818
          bad = True
819

    
820
    if instanceconfig.admin_up:
821
      if ((node_current not in node_instance or
822
          not instance in node_instance[node_current]) and
823
          node_current not in n_offline):
824
        feedback_fn("  - ERROR: instance %s not running on node %s" %
825
                        (instance, node_current))
826
        bad = True
827

    
828
    for node in node_instance:
829
      if (not node == node_current):
830
        if instance in node_instance[node]:
831
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
832
                          (instance, node))
833
          bad = True
834

    
835
    return bad
836

    
837
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838
    """Verify if there are any unknown volumes in the cluster.
839

840
    The .os, .swap and backup volumes are ignored. All other volumes are
841
    reported as unknown.
842

843
    """
844
    bad = False
845

    
846
    for node in node_vol_is:
847
      for volume in node_vol_is[node]:
848
        if node not in node_vol_should or volume not in node_vol_should[node]:
849
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850
                      (volume, node))
851
          bad = True
852
    return bad
853

    
854
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855
    """Verify the list of running instances.
856

857
    This checks what instances are running but unknown to the cluster.
858

859
    """
860
    bad = False
861
    for node in node_instance:
862
      for runninginstance in node_instance[node]:
863
        if runninginstance not in instancelist:
864
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865
                          (runninginstance, node))
866
          bad = True
867
    return bad
868

    
869
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870
    """Verify N+1 Memory Resilience.
871

872
    Check that if one single node dies we can still start all the instances it
873
    was primary for.
874

875
    """
876
    bad = False
877

    
878
    for node, nodeinfo in node_info.iteritems():
879
      # This code checks that every node which is now listed as secondary has
880
      # enough memory to host all instances it is supposed to should a single
881
      # other node in the cluster fail.
882
      # FIXME: not ready for failover to an arbitrary node
883
      # FIXME: does not support file-backed instances
884
      # WARNING: we currently take into account down instances as well as up
885
      # ones, considering that even if they're down someone might want to start
886
      # them even in the event of a node failure.
887
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888
        needed_mem = 0
889
        for instance in instances:
890
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891
          if bep[constants.BE_AUTO_BALANCE]:
892
            needed_mem += bep[constants.BE_MEMORY]
893
        if nodeinfo['mfree'] < needed_mem:
894
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895
                      " failovers should node %s fail" % (node, prinode))
896
          bad = True
897
    return bad
898

    
899
  def CheckPrereq(self):
900
    """Check prerequisites.
901

902
    Transform the list of checks we're going to skip into a set and check that
903
    all its members are valid.
904

905
    """
906
    self.skip_set = frozenset(self.op.skip_checks)
907
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
909

    
910
  def BuildHooksEnv(self):
911
    """Build hooks env.
912

913
    Cluster-Verify hooks just rone in the post phase and their failure makes
914
    the output be logged in the verify output and the verification to fail.
915

916
    """
917
    all_nodes = self.cfg.GetNodeList()
918
    env = {
919
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920
      }
921
    for node in self.cfg.GetAllNodesInfo().values():
922
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923

    
924
    return env, [], all_nodes
925

    
926
  def Exec(self, feedback_fn):
927
    """Verify integrity of cluster, performing various test on nodes.
928

929
    """
930
    bad = False
931
    feedback_fn("* Verifying global settings")
932
    for msg in self.cfg.VerifyConfig():
933
      feedback_fn("  - ERROR: %s" % msg)
934

    
935
    vg_name = self.cfg.GetVGName()
936
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
938
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941
                        for iname in instancelist)
942
    i_non_redundant = [] # Non redundant instances
943
    i_non_a_balanced = [] # Non auto-balanced instances
944
    n_offline = [] # List of offline nodes
945
    n_drained = [] # List of nodes being drained
946
    node_volume = {}
947
    node_instance = {}
948
    node_info = {}
949
    instance_cfg = {}
950

    
951
    # FIXME: verify OS list
952
    # do local checksums
953
    master_files = [constants.CLUSTER_CONF_FILE]
954

    
955
    file_names = ssconf.SimpleStore().GetFileList()
956
    file_names.append(constants.SSL_CERT_FILE)
957
    file_names.append(constants.RAPI_CERT_FILE)
958
    file_names.extend(master_files)
959

    
960
    local_checksums = utils.FingerprintFiles(file_names)
961

    
962
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963
    node_verify_param = {
964
      constants.NV_FILELIST: file_names,
965
      constants.NV_NODELIST: [node.name for node in nodeinfo
966
                              if not node.offline],
967
      constants.NV_HYPERVISOR: hypervisors,
968
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969
                                  node.secondary_ip) for node in nodeinfo
970
                                 if not node.offline],
971
      constants.NV_INSTANCELIST: hypervisors,
972
      constants.NV_VERSION: None,
973
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974
      }
975
    if vg_name is not None:
976
      node_verify_param[constants.NV_VGLIST] = None
977
      node_verify_param[constants.NV_LVLIST] = vg_name
978
      node_verify_param[constants.NV_DRBDLIST] = None
979
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980
                                           self.cfg.GetClusterName())
981

    
982
    cluster = self.cfg.GetClusterInfo()
983
    master_node = self.cfg.GetMasterNode()
984
    all_drbd_map = self.cfg.ComputeDRBDMap()
985

    
986
    for node_i in nodeinfo:
987
      node = node_i.name
988
      nresult = all_nvinfo[node].data
989

    
990
      if node_i.offline:
991
        feedback_fn("* Skipping offline node %s" % (node,))
992
        n_offline.append(node)
993
        continue
994

    
995
      if node == master_node:
996
        ntype = "master"
997
      elif node_i.master_candidate:
998
        ntype = "master candidate"
999
      elif node_i.drained:
1000
        ntype = "drained"
1001
        n_drained.append(node)
1002
      else:
1003
        ntype = "regular"
1004
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005

    
1006
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008
        bad = True
1009
        continue
1010

    
1011
      node_drbd = {}
1012
      for minor, instance in all_drbd_map[node].items():
1013
        if instance not in instanceinfo:
1014
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015
                      instance)
1016
          # ghost instance should not be running, but otherwise we
1017
          # don't give double warnings (both ghost instance and
1018
          # unallocated minor in use)
1019
          node_drbd[minor] = (instance, False)
1020
        else:
1021
          instance = instanceinfo[instance]
1022
          node_drbd[minor] = (instance.name, instance.admin_up)
1023
      result = self._VerifyNode(node_i, file_names, local_checksums,
1024
                                nresult, feedback_fn, master_files,
1025
                                node_drbd, vg_name)
1026
      bad = bad or result
1027

    
1028
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029
      if vg_name is None:
1030
        node_volume[node] = {}
1031
      elif isinstance(lvdata, basestring):
1032
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033
                    (node, utils.SafeEncode(lvdata)))
1034
        bad = True
1035
        node_volume[node] = {}
1036
      elif not isinstance(lvdata, dict):
1037
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038
        bad = True
1039
        continue
1040
      else:
1041
        node_volume[node] = lvdata
1042

    
1043
      # node_instance
1044
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1045
      if not isinstance(idata, list):
1046
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047
                    (node,))
1048
        bad = True
1049
        continue
1050

    
1051
      node_instance[node] = idata
1052

    
1053
      # node_info
1054
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055
      if not isinstance(nodeinfo, dict):
1056
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057
        bad = True
1058
        continue
1059

    
1060
      try:
1061
        node_info[node] = {
1062
          "mfree": int(nodeinfo['memory_free']),
1063
          "pinst": [],
1064
          "sinst": [],
1065
          # dictionary holding all instances this node is secondary for,
1066
          # grouped by their primary node. Each key is a cluster node, and each
1067
          # value is a list of instances which have the key as primary and the
1068
          # current node as secondary.  this is handy to calculate N+1 memory
1069
          # availability if you can only failover from a primary to its
1070
          # secondary.
1071
          "sinst-by-pnode": {},
1072
        }
1073
        # FIXME: devise a free space model for file based instances as well
1074
        if vg_name is not None:
1075
          if (constants.NV_VGLIST not in nresult or
1076
              vg_name not in nresult[constants.NV_VGLIST]):
1077
            feedback_fn("  - ERROR: node %s didn't return data for the"
1078
                        " volume group '%s' - it is either missing or broken" %
1079
                        (node, vg_name))
1080
            bad = True
1081
            continue
1082
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083
      except (ValueError, KeyError):
1084
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085
                    " from node %s" % (node,))
1086
        bad = True
1087
        continue
1088

    
1089
    node_vol_should = {}
1090

    
1091
    for instance in instancelist:
1092
      feedback_fn("* Verifying instance %s" % instance)
1093
      inst_config = instanceinfo[instance]
1094
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1095
                                     node_instance, feedback_fn, n_offline)
1096
      bad = bad or result
1097
      inst_nodes_offline = []
1098

    
1099
      inst_config.MapLVsByNode(node_vol_should)
1100

    
1101
      instance_cfg[instance] = inst_config
1102

    
1103
      pnode = inst_config.primary_node
1104
      if pnode in node_info:
1105
        node_info[pnode]['pinst'].append(instance)
1106
      elif pnode not in n_offline:
1107
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1108
                    " %s failed" % (instance, pnode))
1109
        bad = True
1110

    
1111
      if pnode in n_offline:
1112
        inst_nodes_offline.append(pnode)
1113

    
1114
      # If the instance is non-redundant we cannot survive losing its primary
1115
      # node, so we are not N+1 compliant. On the other hand we have no disk
1116
      # templates with more than one secondary so that situation is not well
1117
      # supported either.
1118
      # FIXME: does not support file-backed instances
1119
      if len(inst_config.secondary_nodes) == 0:
1120
        i_non_redundant.append(instance)
1121
      elif len(inst_config.secondary_nodes) > 1:
1122
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123
                    % instance)
1124

    
1125
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126
        i_non_a_balanced.append(instance)
1127

    
1128
      for snode in inst_config.secondary_nodes:
1129
        if snode in node_info:
1130
          node_info[snode]['sinst'].append(instance)
1131
          if pnode not in node_info[snode]['sinst-by-pnode']:
1132
            node_info[snode]['sinst-by-pnode'][pnode] = []
1133
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134
        elif snode not in n_offline:
1135
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136
                      " %s failed" % (instance, snode))
1137
          bad = True
1138
        if snode in n_offline:
1139
          inst_nodes_offline.append(snode)
1140

    
1141
      if inst_nodes_offline:
1142
        # warn that the instance lives on offline nodes, and set bad=True
1143
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144
                    ", ".join(inst_nodes_offline))
1145
        bad = True
1146

    
1147
    feedback_fn("* Verifying orphan volumes")
1148
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149
                                       feedback_fn)
1150
    bad = bad or result
1151

    
1152
    feedback_fn("* Verifying remaining instances")
1153
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1154
                                         feedback_fn)
1155
    bad = bad or result
1156

    
1157
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158
      feedback_fn("* Verifying N+1 Memory redundancy")
1159
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160
      bad = bad or result
1161

    
1162
    feedback_fn("* Other Notes")
1163
    if i_non_redundant:
1164
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165
                  % len(i_non_redundant))
1166

    
1167
    if i_non_a_balanced:
1168
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169
                  % len(i_non_a_balanced))
1170

    
1171
    if n_offline:
1172
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173

    
1174
    if n_drained:
1175
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176

    
1177
    return not bad
1178

    
1179
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180
    """Analize the post-hooks' result
1181

1182
    This method analyses the hook result, handles it, and sends some
1183
    nicely-formatted feedback back to the user.
1184

1185
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187
    @param hooks_results: the results of the multi-node hooks rpc call
1188
    @param feedback_fn: function used send feedback back to the caller
1189
    @param lu_result: previous Exec result
1190
    @return: the new Exec result, based on the previous result
1191
        and hook results
1192

1193
    """
1194
    # We only really run POST phase hooks, and are only interested in
1195
    # their results
1196
    if phase == constants.HOOKS_PHASE_POST:
1197
      # Used to change hooks' output to proper indentation
1198
      indent_re = re.compile('^', re.M)
1199
      feedback_fn("* Hooks Results")
1200
      if not hooks_results:
1201
        feedback_fn("  - ERROR: general communication failure")
1202
        lu_result = 1
1203
      else:
1204
        for node_name in hooks_results:
1205
          show_node_header = True
1206
          res = hooks_results[node_name]
1207
          if res.failed or res.data is False or not isinstance(res.data, list):
1208
            if res.offline:
1209
              # no need to warn or set fail return value
1210
              continue
1211
            feedback_fn("    Communication failure in hooks execution")
1212
            lu_result = 1
1213
            continue
1214
          for script, hkr, output in res.data:
1215
            if hkr == constants.HKR_FAIL:
1216
              # The node header is only shown once, if there are
1217
              # failing hooks on that node
1218
              if show_node_header:
1219
                feedback_fn("  Node %s:" % node_name)
1220
                show_node_header = False
1221
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1222
              output = indent_re.sub('      ', output)
1223
              feedback_fn("%s" % output)
1224
              lu_result = 1
1225

    
1226
      return lu_result
1227

    
1228

    
1229
class LUVerifyDisks(NoHooksLU):
1230
  """Verifies the cluster disks status.
1231

1232
  """
1233
  _OP_REQP = []
1234
  REQ_BGL = False
1235

    
1236
  def ExpandNames(self):
1237
    self.needed_locks = {
1238
      locking.LEVEL_NODE: locking.ALL_SET,
1239
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1240
    }
1241
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242

    
1243
  def CheckPrereq(self):
1244
    """Check prerequisites.
1245

1246
    This has no prerequisites.
1247

1248
    """
1249
    pass
1250

    
1251
  def Exec(self, feedback_fn):
1252
    """Verify integrity of cluster disks.
1253

1254
    """
1255
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256

    
1257
    vg_name = self.cfg.GetVGName()
1258
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1259
    instances = [self.cfg.GetInstanceInfo(name)
1260
                 for name in self.cfg.GetInstanceList()]
1261

    
1262
    nv_dict = {}
1263
    for inst in instances:
1264
      inst_lvs = {}
1265
      if (not inst.admin_up or
1266
          inst.disk_template not in constants.DTS_NET_MIRROR):
1267
        continue
1268
      inst.MapLVsByNode(inst_lvs)
1269
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270
      for node, vol_list in inst_lvs.iteritems():
1271
        for vol in vol_list:
1272
          nv_dict[(node, vol)] = inst
1273

    
1274
    if not nv_dict:
1275
      return result
1276

    
1277
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278

    
1279
    to_act = set()
1280
    for node in nodes:
1281
      # node_volume
1282
      lvs = node_lvs[node]
1283
      if lvs.failed:
1284
        if not lvs.offline:
1285
          self.LogWarning("Connection to node %s failed: %s" %
1286
                          (node, lvs.data))
1287
        continue
1288
      lvs = lvs.data
1289
      if isinstance(lvs, basestring):
1290
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291
        res_nlvm[node] = lvs
1292
        continue
1293
      elif not isinstance(lvs, dict):
1294
        logging.warning("Connection to node %s failed or invalid data"
1295
                        " returned", node)
1296
        res_nodes.append(node)
1297
        continue
1298

    
1299
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300
        inst = nv_dict.pop((node, lv_name), None)
1301
        if (not lv_online and inst is not None
1302
            and inst.name not in res_instances):
1303
          res_instances.append(inst.name)
1304

    
1305
    # any leftover items in nv_dict are missing LVs, let's arrange the
1306
    # data better
1307
    for key, inst in nv_dict.iteritems():
1308
      if inst.name not in res_missing:
1309
        res_missing[inst.name] = []
1310
      res_missing[inst.name].append(key)
1311

    
1312
    return result
1313

    
1314

    
1315
class LURenameCluster(LogicalUnit):
1316
  """Rename the cluster.
1317

1318
  """
1319
  HPATH = "cluster-rename"
1320
  HTYPE = constants.HTYPE_CLUSTER
1321
  _OP_REQP = ["name"]
1322

    
1323
  def BuildHooksEnv(self):
1324
    """Build hooks env.
1325

1326
    """
1327
    env = {
1328
      "OP_TARGET": self.cfg.GetClusterName(),
1329
      "NEW_NAME": self.op.name,
1330
      }
1331
    mn = self.cfg.GetMasterNode()
1332
    return env, [mn], [mn]
1333

    
1334
  def CheckPrereq(self):
1335
    """Verify that the passed name is a valid one.
1336

1337
    """
1338
    hostname = utils.HostInfo(self.op.name)
1339

    
1340
    new_name = hostname.name
1341
    self.ip = new_ip = hostname.ip
1342
    old_name = self.cfg.GetClusterName()
1343
    old_ip = self.cfg.GetMasterIP()
1344
    if new_name == old_name and new_ip == old_ip:
1345
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346
                                 " cluster has changed")
1347
    if new_ip != old_ip:
1348
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350
                                   " reachable on the network. Aborting." %
1351
                                   new_ip)
1352

    
1353
    self.op.name = new_name
1354

    
1355
  def Exec(self, feedback_fn):
1356
    """Rename the cluster.
1357

1358
    """
1359
    clustername = self.op.name
1360
    ip = self.ip
1361

    
1362
    # shutdown the master IP
1363
    master = self.cfg.GetMasterNode()
1364
    result = self.rpc.call_node_stop_master(master, False)
1365
    if result.failed or not result.data:
1366
      raise errors.OpExecError("Could not disable the master role")
1367

    
1368
    try:
1369
      cluster = self.cfg.GetClusterInfo()
1370
      cluster.cluster_name = clustername
1371
      cluster.master_ip = ip
1372
      self.cfg.Update(cluster)
1373

    
1374
      # update the known hosts file
1375
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376
      node_list = self.cfg.GetNodeList()
1377
      try:
1378
        node_list.remove(master)
1379
      except ValueError:
1380
        pass
1381
      result = self.rpc.call_upload_file(node_list,
1382
                                         constants.SSH_KNOWN_HOSTS_FILE)
1383
      for to_node, to_result in result.iteritems():
1384
        if to_result.failed or not to_result.data:
1385
          logging.error("Copy of file %s to node %s failed",
1386
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1387

    
1388
    finally:
1389
      result = self.rpc.call_node_start_master(master, False)
1390
      if result.failed or not result.data:
1391
        self.LogWarning("Could not re-enable the master role on"
1392
                        " the master, please restart manually.")
1393

    
1394

    
1395
def _RecursiveCheckIfLVMBased(disk):
1396
  """Check if the given disk or its children are lvm-based.
1397

1398
  @type disk: L{objects.Disk}
1399
  @param disk: the disk to check
1400
  @rtype: booleean
1401
  @return: boolean indicating whether a LD_LV dev_type was found or not
1402

1403
  """
1404
  if disk.children:
1405
    for chdisk in disk.children:
1406
      if _RecursiveCheckIfLVMBased(chdisk):
1407
        return True
1408
  return disk.dev_type == constants.LD_LV
1409

    
1410

    
1411
class LUSetClusterParams(LogicalUnit):
1412
  """Change the parameters of the cluster.
1413

1414
  """
1415
  HPATH = "cluster-modify"
1416
  HTYPE = constants.HTYPE_CLUSTER
1417
  _OP_REQP = []
1418
  REQ_BGL = False
1419

    
1420
  def CheckParameters(self):
1421
    """Check parameters
1422

1423
    """
1424
    if not hasattr(self.op, "candidate_pool_size"):
1425
      self.op.candidate_pool_size = None
1426
    if self.op.candidate_pool_size is not None:
1427
      try:
1428
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1429
      except ValueError, err:
1430
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1431
                                   str(err))
1432
      if self.op.candidate_pool_size < 1:
1433
        raise errors.OpPrereqError("At least one master candidate needed")
1434

    
1435
  def ExpandNames(self):
1436
    # FIXME: in the future maybe other cluster params won't require checking on
1437
    # all nodes to be modified.
1438
    self.needed_locks = {
1439
      locking.LEVEL_NODE: locking.ALL_SET,
1440
    }
1441
    self.share_locks[locking.LEVEL_NODE] = 1
1442

    
1443
  def BuildHooksEnv(self):
1444
    """Build hooks env.
1445

1446
    """
1447
    env = {
1448
      "OP_TARGET": self.cfg.GetClusterName(),
1449
      "NEW_VG_NAME": self.op.vg_name,
1450
      }
1451
    mn = self.cfg.GetMasterNode()
1452
    return env, [mn], [mn]
1453

    
1454
  def CheckPrereq(self):
1455
    """Check prerequisites.
1456

1457
    This checks whether the given params don't conflict and
1458
    if the given volume group is valid.
1459

1460
    """
1461
    if self.op.vg_name is not None and not self.op.vg_name:
1462
      instances = self.cfg.GetAllInstancesInfo().values()
1463
      for inst in instances:
1464
        for disk in inst.disks:
1465
          if _RecursiveCheckIfLVMBased(disk):
1466
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1467
                                       " lvm-based instances exist")
1468

    
1469
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1470

    
1471
    # if vg_name not None, checks given volume group on all nodes
1472
    if self.op.vg_name:
1473
      vglist = self.rpc.call_vg_list(node_list)
1474
      for node in node_list:
1475
        if vglist[node].failed:
1476
          # ignoring down node
1477
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1478
          continue
1479
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1480
                                              self.op.vg_name,
1481
                                              constants.MIN_VG_SIZE)
1482
        if vgstatus:
1483
          raise errors.OpPrereqError("Error on node '%s': %s" %
1484
                                     (node, vgstatus))
1485

    
1486
    self.cluster = cluster = self.cfg.GetClusterInfo()
1487
    # validate beparams changes
1488
    if self.op.beparams:
1489
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1490
      self.new_beparams = cluster.FillDict(
1491
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1492

    
1493
    # hypervisor list/parameters
1494
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1495
    if self.op.hvparams:
1496
      if not isinstance(self.op.hvparams, dict):
1497
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1498
      for hv_name, hv_dict in self.op.hvparams.items():
1499
        if hv_name not in self.new_hvparams:
1500
          self.new_hvparams[hv_name] = hv_dict
1501
        else:
1502
          self.new_hvparams[hv_name].update(hv_dict)
1503

    
1504
    if self.op.enabled_hypervisors is not None:
1505
      self.hv_list = self.op.enabled_hypervisors
1506
    else:
1507
      self.hv_list = cluster.enabled_hypervisors
1508

    
1509
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1510
      # either the enabled list has changed, or the parameters have, validate
1511
      for hv_name, hv_params in self.new_hvparams.items():
1512
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1513
            (self.op.enabled_hypervisors and
1514
             hv_name in self.op.enabled_hypervisors)):
1515
          # either this is a new hypervisor, or its parameters have changed
1516
          hv_class = hypervisor.GetHypervisor(hv_name)
1517
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1518
          hv_class.CheckParameterSyntax(hv_params)
1519
          _CheckHVParams(self, node_list, hv_name, hv_params)
1520

    
1521
  def Exec(self, feedback_fn):
1522
    """Change the parameters of the cluster.
1523

1524
    """
1525
    if self.op.vg_name is not None:
1526
      if self.op.vg_name != self.cfg.GetVGName():
1527
        self.cfg.SetVGName(self.op.vg_name)
1528
      else:
1529
        feedback_fn("Cluster LVM configuration already in desired"
1530
                    " state, not changing")
1531
    if self.op.hvparams:
1532
      self.cluster.hvparams = self.new_hvparams
1533
    if self.op.enabled_hypervisors is not None:
1534
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1535
    if self.op.beparams:
1536
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1537
    if self.op.candidate_pool_size is not None:
1538
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1539

    
1540
    self.cfg.Update(self.cluster)
1541

    
1542
    # we want to update nodes after the cluster so that if any errors
1543
    # happen, we have recorded and saved the cluster info
1544
    if self.op.candidate_pool_size is not None:
1545
      _AdjustCandidatePool(self)
1546

    
1547

    
1548
class LURedistributeConfig(NoHooksLU):
1549
  """Force the redistribution of cluster configuration.
1550

1551
  This is a very simple LU.
1552

1553
  """
1554
  _OP_REQP = []
1555
  REQ_BGL = False
1556

    
1557
  def ExpandNames(self):
1558
    self.needed_locks = {
1559
      locking.LEVEL_NODE: locking.ALL_SET,
1560
    }
1561
    self.share_locks[locking.LEVEL_NODE] = 1
1562

    
1563
  def CheckPrereq(self):
1564
    """Check prerequisites.
1565

1566
    """
1567

    
1568
  def Exec(self, feedback_fn):
1569
    """Redistribute the configuration.
1570

1571
    """
1572
    self.cfg.Update(self.cfg.GetClusterInfo())
1573

    
1574

    
1575
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1576
  """Sleep and poll for an instance's disk to sync.
1577

1578
  """
1579
  if not instance.disks:
1580
    return True
1581

    
1582
  if not oneshot:
1583
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1584

    
1585
  node = instance.primary_node
1586

    
1587
  for dev in instance.disks:
1588
    lu.cfg.SetDiskID(dev, node)
1589

    
1590
  retries = 0
1591
  while True:
1592
    max_time = 0
1593
    done = True
1594
    cumul_degraded = False
1595
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1596
    if rstats.failed or not rstats.data:
1597
      lu.LogWarning("Can't get any data from node %s", node)
1598
      retries += 1
1599
      if retries >= 10:
1600
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1601
                                 " aborting." % node)
1602
      time.sleep(6)
1603
      continue
1604
    rstats = rstats.data
1605
    retries = 0
1606
    for i, mstat in enumerate(rstats):
1607
      if mstat is None:
1608
        lu.LogWarning("Can't compute data for node %s/%s",
1609
                           node, instance.disks[i].iv_name)
1610
        continue
1611
      # we ignore the ldisk parameter
1612
      perc_done, est_time, is_degraded, _ = mstat
1613
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1614
      if perc_done is not None:
1615
        done = False
1616
        if est_time is not None:
1617
          rem_time = "%d estimated seconds remaining" % est_time
1618
          max_time = est_time
1619
        else:
1620
          rem_time = "no time estimate"
1621
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1622
                        (instance.disks[i].iv_name, perc_done, rem_time))
1623
    if done or oneshot:
1624
      break
1625

    
1626
    time.sleep(min(60, max_time))
1627

    
1628
  if done:
1629
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1630
  return not cumul_degraded
1631

    
1632

    
1633
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1634
  """Check that mirrors are not degraded.
1635

1636
  The ldisk parameter, if True, will change the test from the
1637
  is_degraded attribute (which represents overall non-ok status for
1638
  the device(s)) to the ldisk (representing the local storage status).
1639

1640
  """
1641
  lu.cfg.SetDiskID(dev, node)
1642
  if ldisk:
1643
    idx = 6
1644
  else:
1645
    idx = 5
1646

    
1647
  result = True
1648
  if on_primary or dev.AssembleOnSecondary():
1649
    rstats = lu.rpc.call_blockdev_find(node, dev)
1650
    msg = rstats.RemoteFailMsg()
1651
    if msg:
1652
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1653
      result = False
1654
    elif not rstats.payload:
1655
      lu.LogWarning("Can't find disk on node %s", node)
1656
      result = False
1657
    else:
1658
      result = result and (not rstats.payload[idx])
1659
  if dev.children:
1660
    for child in dev.children:
1661
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1662

    
1663
  return result
1664

    
1665

    
1666
class LUDiagnoseOS(NoHooksLU):
1667
  """Logical unit for OS diagnose/query.
1668

1669
  """
1670
  _OP_REQP = ["output_fields", "names"]
1671
  REQ_BGL = False
1672
  _FIELDS_STATIC = utils.FieldSet()
1673
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1674

    
1675
  def ExpandNames(self):
1676
    if self.op.names:
1677
      raise errors.OpPrereqError("Selective OS query not supported")
1678

    
1679
    _CheckOutputFields(static=self._FIELDS_STATIC,
1680
                       dynamic=self._FIELDS_DYNAMIC,
1681
                       selected=self.op.output_fields)
1682

    
1683
    # Lock all nodes, in shared mode
1684
    # Temporary removal of locks, should be reverted later
1685
    # TODO: reintroduce locks when they are lighter-weight
1686
    self.needed_locks = {}
1687
    #self.share_locks[locking.LEVEL_NODE] = 1
1688
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1689

    
1690
  def CheckPrereq(self):
1691
    """Check prerequisites.
1692

1693
    """
1694

    
1695
  @staticmethod
1696
  def _DiagnoseByOS(node_list, rlist):
1697
    """Remaps a per-node return list into an a per-os per-node dictionary
1698

1699
    @param node_list: a list with the names of all nodes
1700
    @param rlist: a map with node names as keys and OS objects as values
1701

1702
    @rtype: dict
1703
    @return: a dictionary with osnames as keys and as value another map, with
1704
        nodes as keys and list of OS objects as values, eg::
1705

1706
          {"debian-etch": {"node1": [<object>,...],
1707
                           "node2": [<object>,]}
1708
          }
1709

1710
    """
1711
    all_os = {}
1712
    # we build here the list of nodes that didn't fail the RPC (at RPC
1713
    # level), so that nodes with a non-responding node daemon don't
1714
    # make all OSes invalid
1715
    good_nodes = [node_name for node_name in rlist
1716
                  if not rlist[node_name].failed]
1717
    for node_name, nr in rlist.iteritems():
1718
      if nr.failed or not nr.data:
1719
        continue
1720
      for os_obj in nr.data:
1721
        if os_obj.name not in all_os:
1722
          # build a list of nodes for this os containing empty lists
1723
          # for each node in node_list
1724
          all_os[os_obj.name] = {}
1725
          for nname in good_nodes:
1726
            all_os[os_obj.name][nname] = []
1727
        all_os[os_obj.name][node_name].append(os_obj)
1728
    return all_os
1729

    
1730
  def Exec(self, feedback_fn):
1731
    """Compute the list of OSes.
1732

1733
    """
1734
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1735
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1736
    if node_data == False:
1737
      raise errors.OpExecError("Can't gather the list of OSes")
1738
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1739
    output = []
1740
    for os_name, os_data in pol.iteritems():
1741
      row = []
1742
      for field in self.op.output_fields:
1743
        if field == "name":
1744
          val = os_name
1745
        elif field == "valid":
1746
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1747
        elif field == "node_status":
1748
          val = {}
1749
          for node_name, nos_list in os_data.iteritems():
1750
            val[node_name] = [(v.status, v.path) for v in nos_list]
1751
        else:
1752
          raise errors.ParameterError(field)
1753
        row.append(val)
1754
      output.append(row)
1755

    
1756
    return output
1757

    
1758

    
1759
class LURemoveNode(LogicalUnit):
1760
  """Logical unit for removing a node.
1761

1762
  """
1763
  HPATH = "node-remove"
1764
  HTYPE = constants.HTYPE_NODE
1765
  _OP_REQP = ["node_name"]
1766

    
1767
  def BuildHooksEnv(self):
1768
    """Build hooks env.
1769

1770
    This doesn't run on the target node in the pre phase as a failed
1771
    node would then be impossible to remove.
1772

1773
    """
1774
    env = {
1775
      "OP_TARGET": self.op.node_name,
1776
      "NODE_NAME": self.op.node_name,
1777
      }
1778
    all_nodes = self.cfg.GetNodeList()
1779
    all_nodes.remove(self.op.node_name)
1780
    return env, all_nodes, all_nodes
1781

    
1782
  def CheckPrereq(self):
1783
    """Check prerequisites.
1784

1785
    This checks:
1786
     - the node exists in the configuration
1787
     - it does not have primary or secondary instances
1788
     - it's not the master
1789

1790
    Any errors are signalled by raising errors.OpPrereqError.
1791

1792
    """
1793
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1794
    if node is None:
1795
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1796

    
1797
    instance_list = self.cfg.GetInstanceList()
1798

    
1799
    masternode = self.cfg.GetMasterNode()
1800
    if node.name == masternode:
1801
      raise errors.OpPrereqError("Node is the master node,"
1802
                                 " you need to failover first.")
1803

    
1804
    for instance_name in instance_list:
1805
      instance = self.cfg.GetInstanceInfo(instance_name)
1806
      if node.name in instance.all_nodes:
1807
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1808
                                   " please remove first." % instance_name)
1809
    self.op.node_name = node.name
1810
    self.node = node
1811

    
1812
  def Exec(self, feedback_fn):
1813
    """Removes the node from the cluster.
1814

1815
    """
1816
    node = self.node
1817
    logging.info("Stopping the node daemon and removing configs from node %s",
1818
                 node.name)
1819

    
1820
    self.context.RemoveNode(node.name)
1821

    
1822
    self.rpc.call_node_leave_cluster(node.name)
1823

    
1824
    # Promote nodes to master candidate as needed
1825
    _AdjustCandidatePool(self)
1826

    
1827

    
1828
class LUQueryNodes(NoHooksLU):
1829
  """Logical unit for querying nodes.
1830

1831
  """
1832
  _OP_REQP = ["output_fields", "names", "use_locking"]
1833
  REQ_BGL = False
1834
  _FIELDS_DYNAMIC = utils.FieldSet(
1835
    "dtotal", "dfree",
1836
    "mtotal", "mnode", "mfree",
1837
    "bootid",
1838
    "ctotal", "cnodes", "csockets",
1839
    )
1840

    
1841
  _FIELDS_STATIC = utils.FieldSet(
1842
    "name", "pinst_cnt", "sinst_cnt",
1843
    "pinst_list", "sinst_list",
1844
    "pip", "sip", "tags",
1845
    "serial_no",
1846
    "master_candidate",
1847
    "master",
1848
    "offline",
1849
    "drained",
1850
    )
1851

    
1852
  def ExpandNames(self):
1853
    _CheckOutputFields(static=self._FIELDS_STATIC,
1854
                       dynamic=self._FIELDS_DYNAMIC,
1855
                       selected=self.op.output_fields)
1856

    
1857
    self.needed_locks = {}
1858
    self.share_locks[locking.LEVEL_NODE] = 1
1859

    
1860
    if self.op.names:
1861
      self.wanted = _GetWantedNodes(self, self.op.names)
1862
    else:
1863
      self.wanted = locking.ALL_SET
1864

    
1865
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1866
    self.do_locking = self.do_node_query and self.op.use_locking
1867
    if self.do_locking:
1868
      # if we don't request only static fields, we need to lock the nodes
1869
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1870

    
1871

    
1872
  def CheckPrereq(self):
1873
    """Check prerequisites.
1874

1875
    """
1876
    # The validation of the node list is done in the _GetWantedNodes,
1877
    # if non empty, and if empty, there's no validation to do
1878
    pass
1879

    
1880
  def Exec(self, feedback_fn):
1881
    """Computes the list of nodes and their attributes.
1882

1883
    """
1884
    all_info = self.cfg.GetAllNodesInfo()
1885
    if self.do_locking:
1886
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1887
    elif self.wanted != locking.ALL_SET:
1888
      nodenames = self.wanted
1889
      missing = set(nodenames).difference(all_info.keys())
1890
      if missing:
1891
        raise errors.OpExecError(
1892
          "Some nodes were removed before retrieving their data: %s" % missing)
1893
    else:
1894
      nodenames = all_info.keys()
1895

    
1896
    nodenames = utils.NiceSort(nodenames)
1897
    nodelist = [all_info[name] for name in nodenames]
1898

    
1899
    # begin data gathering
1900

    
1901
    if self.do_node_query:
1902
      live_data = {}
1903
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1904
                                          self.cfg.GetHypervisorType())
1905
      for name in nodenames:
1906
        nodeinfo = node_data[name]
1907
        if not nodeinfo.failed and nodeinfo.data:
1908
          nodeinfo = nodeinfo.data
1909
          fn = utils.TryConvert
1910
          live_data[name] = {
1911
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1912
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1913
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1914
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1915
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1916
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1917
            "bootid": nodeinfo.get('bootid', None),
1918
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1919
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1920
            }
1921
        else:
1922
          live_data[name] = {}
1923
    else:
1924
      live_data = dict.fromkeys(nodenames, {})
1925

    
1926
    node_to_primary = dict([(name, set()) for name in nodenames])
1927
    node_to_secondary = dict([(name, set()) for name in nodenames])
1928

    
1929
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1930
                             "sinst_cnt", "sinst_list"))
1931
    if inst_fields & frozenset(self.op.output_fields):
1932
      instancelist = self.cfg.GetInstanceList()
1933

    
1934
      for instance_name in instancelist:
1935
        inst = self.cfg.GetInstanceInfo(instance_name)
1936
        if inst.primary_node in node_to_primary:
1937
          node_to_primary[inst.primary_node].add(inst.name)
1938
        for secnode in inst.secondary_nodes:
1939
          if secnode in node_to_secondary:
1940
            node_to_secondary[secnode].add(inst.name)
1941

    
1942
    master_node = self.cfg.GetMasterNode()
1943

    
1944
    # end data gathering
1945

    
1946
    output = []
1947
    for node in nodelist:
1948
      node_output = []
1949
      for field in self.op.output_fields:
1950
        if field == "name":
1951
          val = node.name
1952
        elif field == "pinst_list":
1953
          val = list(node_to_primary[node.name])
1954
        elif field == "sinst_list":
1955
          val = list(node_to_secondary[node.name])
1956
        elif field == "pinst_cnt":
1957
          val = len(node_to_primary[node.name])
1958
        elif field == "sinst_cnt":
1959
          val = len(node_to_secondary[node.name])
1960
        elif field == "pip":
1961
          val = node.primary_ip
1962
        elif field == "sip":
1963
          val = node.secondary_ip
1964
        elif field == "tags":
1965
          val = list(node.GetTags())
1966
        elif field == "serial_no":
1967
          val = node.serial_no
1968
        elif field == "master_candidate":
1969
          val = node.master_candidate
1970
        elif field == "master":
1971
          val = node.name == master_node
1972
        elif field == "offline":
1973
          val = node.offline
1974
        elif field == "drained":
1975
          val = node.drained
1976
        elif self._FIELDS_DYNAMIC.Matches(field):
1977
          val = live_data[node.name].get(field, None)
1978
        else:
1979
          raise errors.ParameterError(field)
1980
        node_output.append(val)
1981
      output.append(node_output)
1982

    
1983
    return output
1984

    
1985

    
1986
class LUQueryNodeVolumes(NoHooksLU):
1987
  """Logical unit for getting volumes on node(s).
1988

1989
  """
1990
  _OP_REQP = ["nodes", "output_fields"]
1991
  REQ_BGL = False
1992
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1993
  _FIELDS_STATIC = utils.FieldSet("node")
1994

    
1995
  def ExpandNames(self):
1996
    _CheckOutputFields(static=self._FIELDS_STATIC,
1997
                       dynamic=self._FIELDS_DYNAMIC,
1998
                       selected=self.op.output_fields)
1999

    
2000
    self.needed_locks = {}
2001
    self.share_locks[locking.LEVEL_NODE] = 1
2002
    if not self.op.nodes:
2003
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2004
    else:
2005
      self.needed_locks[locking.LEVEL_NODE] = \
2006
        _GetWantedNodes(self, self.op.nodes)
2007

    
2008
  def CheckPrereq(self):
2009
    """Check prerequisites.
2010

2011
    This checks that the fields required are valid output fields.
2012

2013
    """
2014
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2015

    
2016
  def Exec(self, feedback_fn):
2017
    """Computes the list of nodes and their attributes.
2018

2019
    """
2020
    nodenames = self.nodes
2021
    volumes = self.rpc.call_node_volumes(nodenames)
2022

    
2023
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2024
             in self.cfg.GetInstanceList()]
2025

    
2026
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2027

    
2028
    output = []
2029
    for node in nodenames:
2030
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2031
        continue
2032

    
2033
      node_vols = volumes[node].data[:]
2034
      node_vols.sort(key=lambda vol: vol['dev'])
2035

    
2036
      for vol in node_vols:
2037
        node_output = []
2038
        for field in self.op.output_fields:
2039
          if field == "node":
2040
            val = node
2041
          elif field == "phys":
2042
            val = vol['dev']
2043
          elif field == "vg":
2044
            val = vol['vg']
2045
          elif field == "name":
2046
            val = vol['name']
2047
          elif field == "size":
2048
            val = int(float(vol['size']))
2049
          elif field == "instance":
2050
            for inst in ilist:
2051
              if node not in lv_by_node[inst]:
2052
                continue
2053
              if vol['name'] in lv_by_node[inst][node]:
2054
                val = inst.name
2055
                break
2056
            else:
2057
              val = '-'
2058
          else:
2059
            raise errors.ParameterError(field)
2060
          node_output.append(str(val))
2061

    
2062
        output.append(node_output)
2063

    
2064
    return output
2065

    
2066

    
2067
class LUAddNode(LogicalUnit):
2068
  """Logical unit for adding node to the cluster.
2069

2070
  """
2071
  HPATH = "node-add"
2072
  HTYPE = constants.HTYPE_NODE
2073
  _OP_REQP = ["node_name"]
2074

    
2075
  def BuildHooksEnv(self):
2076
    """Build hooks env.
2077

2078
    This will run on all nodes before, and on all nodes + the new node after.
2079

2080
    """
2081
    env = {
2082
      "OP_TARGET": self.op.node_name,
2083
      "NODE_NAME": self.op.node_name,
2084
      "NODE_PIP": self.op.primary_ip,
2085
      "NODE_SIP": self.op.secondary_ip,
2086
      }
2087
    nodes_0 = self.cfg.GetNodeList()
2088
    nodes_1 = nodes_0 + [self.op.node_name, ]
2089
    return env, nodes_0, nodes_1
2090

    
2091
  def CheckPrereq(self):
2092
    """Check prerequisites.
2093

2094
    This checks:
2095
     - the new node is not already in the config
2096
     - it is resolvable
2097
     - its parameters (single/dual homed) matches the cluster
2098

2099
    Any errors are signalled by raising errors.OpPrereqError.
2100

2101
    """
2102
    node_name = self.op.node_name
2103
    cfg = self.cfg
2104

    
2105
    dns_data = utils.HostInfo(node_name)
2106

    
2107
    node = dns_data.name
2108
    primary_ip = self.op.primary_ip = dns_data.ip
2109
    secondary_ip = getattr(self.op, "secondary_ip", None)
2110
    if secondary_ip is None:
2111
      secondary_ip = primary_ip
2112
    if not utils.IsValidIP(secondary_ip):
2113
      raise errors.OpPrereqError("Invalid secondary IP given")
2114
    self.op.secondary_ip = secondary_ip
2115

    
2116
    node_list = cfg.GetNodeList()
2117
    if not self.op.readd and node in node_list:
2118
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2119
                                 node)
2120
    elif self.op.readd and node not in node_list:
2121
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2122

    
2123
    for existing_node_name in node_list:
2124
      existing_node = cfg.GetNodeInfo(existing_node_name)
2125

    
2126
      if self.op.readd and node == existing_node_name:
2127
        if (existing_node.primary_ip != primary_ip or
2128
            existing_node.secondary_ip != secondary_ip):
2129
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2130
                                     " address configuration as before")
2131
        continue
2132

    
2133
      if (existing_node.primary_ip == primary_ip or
2134
          existing_node.secondary_ip == primary_ip or
2135
          existing_node.primary_ip == secondary_ip or
2136
          existing_node.secondary_ip == secondary_ip):
2137
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2138
                                   " existing node %s" % existing_node.name)
2139

    
2140
    # check that the type of the node (single versus dual homed) is the
2141
    # same as for the master
2142
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2143
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2144
    newbie_singlehomed = secondary_ip == primary_ip
2145
    if master_singlehomed != newbie_singlehomed:
2146
      if master_singlehomed:
2147
        raise errors.OpPrereqError("The master has no private ip but the"
2148
                                   " new node has one")
2149
      else:
2150
        raise errors.OpPrereqError("The master has a private ip but the"
2151
                                   " new node doesn't have one")
2152

    
2153
    # checks reachablity
2154
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2155
      raise errors.OpPrereqError("Node not reachable by ping")
2156

    
2157
    if not newbie_singlehomed:
2158
      # check reachability from my secondary ip to newbie's secondary ip
2159
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2160
                           source=myself.secondary_ip):
2161
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2162
                                   " based ping to noded port")
2163

    
2164
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2165
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2166
    master_candidate = mc_now < cp_size
2167

    
2168
    self.new_node = objects.Node(name=node,
2169
                                 primary_ip=primary_ip,
2170
                                 secondary_ip=secondary_ip,
2171
                                 master_candidate=master_candidate,
2172
                                 offline=False, drained=False)
2173

    
2174
  def Exec(self, feedback_fn):
2175
    """Adds the new node to the cluster.
2176

2177
    """
2178
    new_node = self.new_node
2179
    node = new_node.name
2180

    
2181
    # check connectivity
2182
    result = self.rpc.call_version([node])[node]
2183
    result.Raise()
2184
    if result.data:
2185
      if constants.PROTOCOL_VERSION == result.data:
2186
        logging.info("Communication to node %s fine, sw version %s match",
2187
                     node, result.data)
2188
      else:
2189
        raise errors.OpExecError("Version mismatch master version %s,"
2190
                                 " node version %s" %
2191
                                 (constants.PROTOCOL_VERSION, result.data))
2192
    else:
2193
      raise errors.OpExecError("Cannot get version from the new node")
2194

    
2195
    # setup ssh on node
2196
    logging.info("Copy ssh key to node %s", node)
2197
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2198
    keyarray = []
2199
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2200
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2201
                priv_key, pub_key]
2202

    
2203
    for i in keyfiles:
2204
      f = open(i, 'r')
2205
      try:
2206
        keyarray.append(f.read())
2207
      finally:
2208
        f.close()
2209

    
2210
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2211
                                    keyarray[2],
2212
                                    keyarray[3], keyarray[4], keyarray[5])
2213

    
2214
    msg = result.RemoteFailMsg()
2215
    if msg:
2216
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2217
                               " new node: %s" % msg)
2218

    
2219
    # Add node to our /etc/hosts, and add key to known_hosts
2220
    utils.AddHostToEtcHosts(new_node.name)
2221

    
2222
    if new_node.secondary_ip != new_node.primary_ip:
2223
      result = self.rpc.call_node_has_ip_address(new_node.name,
2224
                                                 new_node.secondary_ip)
2225
      if result.failed or not result.data:
2226
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2227
                                 " you gave (%s). Please fix and re-run this"
2228
                                 " command." % new_node.secondary_ip)
2229

    
2230
    node_verify_list = [self.cfg.GetMasterNode()]
2231
    node_verify_param = {
2232
      'nodelist': [node],
2233
      # TODO: do a node-net-test as well?
2234
    }
2235

    
2236
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2237
                                       self.cfg.GetClusterName())
2238
    for verifier in node_verify_list:
2239
      if result[verifier].failed or not result[verifier].data:
2240
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2241
                                 " for remote verification" % verifier)
2242
      if result[verifier].data['nodelist']:
2243
        for failed in result[verifier].data['nodelist']:
2244
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2245
                      (verifier, result[verifier].data['nodelist'][failed]))
2246
        raise errors.OpExecError("ssh/hostname verification failed.")
2247

    
2248
    # Distribute updated /etc/hosts and known_hosts to all nodes,
2249
    # including the node just added
2250
    myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2251
    dist_nodes = self.cfg.GetNodeList()
2252
    if not self.op.readd:
2253
      dist_nodes.append(node)
2254
    if myself.name in dist_nodes:
2255
      dist_nodes.remove(myself.name)
2256

    
2257
    logging.debug("Copying hosts and known_hosts to all nodes")
2258
    for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2259
      result = self.rpc.call_upload_file(dist_nodes, fname)
2260
      for to_node, to_result in result.iteritems():
2261
        if to_result.failed or not to_result.data:
2262
          logging.error("Copy of file %s to node %s failed", fname, to_node)
2263

    
2264
    to_copy = []
2265
    enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2266
    if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2267
      to_copy.append(constants.VNC_PASSWORD_FILE)
2268

    
2269
    for fname in to_copy:
2270
      result = self.rpc.call_upload_file([node], fname)
2271
      if result[node].failed or not result[node]:
2272
        logging.error("Could not copy file %s to node %s", fname, node)
2273

    
2274
    if self.op.readd:
2275
      self.context.ReaddNode(new_node)
2276
    else:
2277
      self.context.AddNode(new_node)
2278

    
2279

    
2280
class LUSetNodeParams(LogicalUnit):
2281
  """Modifies the parameters of a node.
2282

2283
  """
2284
  HPATH = "node-modify"
2285
  HTYPE = constants.HTYPE_NODE
2286
  _OP_REQP = ["node_name"]
2287
  REQ_BGL = False
2288

    
2289
  def CheckArguments(self):
2290
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2291
    if node_name is None:
2292
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2293
    self.op.node_name = node_name
2294
    _CheckBooleanOpField(self.op, 'master_candidate')
2295
    _CheckBooleanOpField(self.op, 'offline')
2296
    _CheckBooleanOpField(self.op, 'drained')
2297
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2298
    if all_mods.count(None) == 3:
2299
      raise errors.OpPrereqError("Please pass at least one modification")
2300
    if all_mods.count(True) > 1:
2301
      raise errors.OpPrereqError("Can't set the node into more than one"
2302
                                 " state at the same time")
2303

    
2304
  def ExpandNames(self):
2305
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2306

    
2307
  def BuildHooksEnv(self):
2308
    """Build hooks env.
2309

2310
    This runs on the master node.
2311

2312
    """
2313
    env = {
2314
      "OP_TARGET": self.op.node_name,
2315
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2316
      "OFFLINE": str(self.op.offline),
2317
      "DRAINED": str(self.op.drained),
2318
      }
2319
    nl = [self.cfg.GetMasterNode(),
2320
          self.op.node_name]
2321
    return env, nl, nl
2322

    
2323
  def CheckPrereq(self):
2324
    """Check prerequisites.
2325

2326
    This only checks the instance list against the existing names.
2327

2328
    """
2329
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2330

    
2331
    if ((self.op.master_candidate == False or self.op.offline == True or
2332
         self.op.drained == True) and node.master_candidate):
2333
      # we will demote the node from master_candidate
2334
      if self.op.node_name == self.cfg.GetMasterNode():
2335
        raise errors.OpPrereqError("The master node has to be a"
2336
                                   " master candidate, online and not drained")
2337
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2338
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2339
      if num_candidates <= cp_size:
2340
        msg = ("Not enough master candidates (desired"
2341
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2342
        if self.op.force:
2343
          self.LogWarning(msg)
2344
        else:
2345
          raise errors.OpPrereqError(msg)
2346

    
2347
    if (self.op.master_candidate == True and
2348
        ((node.offline and not self.op.offline == False) or
2349
         (node.drained and not self.op.drained == False))):
2350
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2351
                                 " to master_candidate" % node.name)
2352

    
2353
    return
2354

    
2355
  def Exec(self, feedback_fn):
2356
    """Modifies a node.
2357

2358
    """
2359
    node = self.node
2360

    
2361
    result = []
2362
    changed_mc = False
2363

    
2364
    if self.op.offline is not None:
2365
      node.offline = self.op.offline
2366
      result.append(("offline", str(self.op.offline)))
2367
      if self.op.offline == True:
2368
        if node.master_candidate:
2369
          node.master_candidate = False
2370
          changed_mc = True
2371
          result.append(("master_candidate", "auto-demotion due to offline"))
2372
        if node.drained:
2373
          node.drained = False
2374
          result.append(("drained", "clear drained status due to offline"))
2375

    
2376
    if self.op.master_candidate is not None:
2377
      node.master_candidate = self.op.master_candidate
2378
      changed_mc = True
2379
      result.append(("master_candidate", str(self.op.master_candidate)))
2380
      if self.op.master_candidate == False:
2381
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2382
        msg = rrc.RemoteFailMsg()
2383
        if msg:
2384
          self.LogWarning("Node failed to demote itself: %s" % msg)
2385

    
2386
    if self.op.drained is not None:
2387
      node.drained = self.op.drained
2388
      result.append(("drained", str(self.op.drained)))
2389
      if self.op.drained == True:
2390
        if node.master_candidate:
2391
          node.master_candidate = False
2392
          changed_mc = True
2393
          result.append(("master_candidate", "auto-demotion due to drain"))
2394
        if node.offline:
2395
          node.offline = False
2396
          result.append(("offline", "clear offline status due to drain"))
2397

    
2398
    # this will trigger configuration file update, if needed
2399
    self.cfg.Update(node)
2400
    # this will trigger job queue propagation or cleanup
2401
    if changed_mc:
2402
      self.context.ReaddNode(node)
2403

    
2404
    return result
2405

    
2406

    
2407
class LUQueryClusterInfo(NoHooksLU):
2408
  """Query cluster configuration.
2409

2410
  """
2411
  _OP_REQP = []
2412
  REQ_BGL = False
2413

    
2414
  def ExpandNames(self):
2415
    self.needed_locks = {}
2416

    
2417
  def CheckPrereq(self):
2418
    """No prerequsites needed for this LU.
2419

2420
    """
2421
    pass
2422

    
2423
  def Exec(self, feedback_fn):
2424
    """Return cluster config.
2425

2426
    """
2427
    cluster = self.cfg.GetClusterInfo()
2428
    result = {
2429
      "software_version": constants.RELEASE_VERSION,
2430
      "protocol_version": constants.PROTOCOL_VERSION,
2431
      "config_version": constants.CONFIG_VERSION,
2432
      "os_api_version": constants.OS_API_VERSION,
2433
      "export_version": constants.EXPORT_VERSION,
2434
      "architecture": (platform.architecture()[0], platform.machine()),
2435
      "name": cluster.cluster_name,
2436
      "master": cluster.master_node,
2437
      "default_hypervisor": cluster.default_hypervisor,
2438
      "enabled_hypervisors": cluster.enabled_hypervisors,
2439
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2440
                        for hypervisor in cluster.enabled_hypervisors]),
2441
      "beparams": cluster.beparams,
2442
      "candidate_pool_size": cluster.candidate_pool_size,
2443
      }
2444

    
2445
    return result
2446

    
2447

    
2448
class LUQueryConfigValues(NoHooksLU):
2449
  """Return configuration values.
2450

2451
  """
2452
  _OP_REQP = []
2453
  REQ_BGL = False
2454
  _FIELDS_DYNAMIC = utils.FieldSet()
2455
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2456

    
2457
  def ExpandNames(self):
2458
    self.needed_locks = {}
2459

    
2460
    _CheckOutputFields(static=self._FIELDS_STATIC,
2461
                       dynamic=self._FIELDS_DYNAMIC,
2462
                       selected=self.op.output_fields)
2463

    
2464
  def CheckPrereq(self):
2465
    """No prerequisites.
2466

2467
    """
2468
    pass
2469

    
2470
  def Exec(self, feedback_fn):
2471
    """Dump a representation of the cluster config to the standard output.
2472

2473
    """
2474
    values = []
2475
    for field in self.op.output_fields:
2476
      if field == "cluster_name":
2477
        entry = self.cfg.GetClusterName()
2478
      elif field == "master_node":
2479
        entry = self.cfg.GetMasterNode()
2480
      elif field == "drain_flag":
2481
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2482
      else:
2483
        raise errors.ParameterError(field)
2484
      values.append(entry)
2485
    return values
2486

    
2487

    
2488
class LUActivateInstanceDisks(NoHooksLU):
2489
  """Bring up an instance's disks.
2490

2491
  """
2492
  _OP_REQP = ["instance_name"]
2493
  REQ_BGL = False
2494

    
2495
  def ExpandNames(self):
2496
    self._ExpandAndLockInstance()
2497
    self.needed_locks[locking.LEVEL_NODE] = []
2498
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2499

    
2500
  def DeclareLocks(self, level):
2501
    if level == locking.LEVEL_NODE:
2502
      self._LockInstancesNodes()
2503

    
2504
  def CheckPrereq(self):
2505
    """Check prerequisites.
2506

2507
    This checks that the instance is in the cluster.
2508

2509
    """
2510
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2511
    assert self.instance is not None, \
2512
      "Cannot retrieve locked instance %s" % self.op.instance_name
2513
    _CheckNodeOnline(self, self.instance.primary_node)
2514

    
2515
  def Exec(self, feedback_fn):
2516
    """Activate the disks.
2517

2518
    """
2519
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2520
    if not disks_ok:
2521
      raise errors.OpExecError("Cannot activate block devices")
2522

    
2523
    return disks_info
2524

    
2525

    
2526
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2527
  """Prepare the block devices for an instance.
2528

2529
  This sets up the block devices on all nodes.
2530

2531
  @type lu: L{LogicalUnit}
2532
  @param lu: the logical unit on whose behalf we execute
2533
  @type instance: L{objects.Instance}
2534
  @param instance: the instance for whose disks we assemble
2535
  @type ignore_secondaries: boolean
2536
  @param ignore_secondaries: if true, errors on secondary nodes
2537
      won't result in an error return from the function
2538
  @return: False if the operation failed, otherwise a list of
2539
      (host, instance_visible_name, node_visible_name)
2540
      with the mapping from node devices to instance devices
2541

2542
  """
2543
  device_info = []
2544
  disks_ok = True
2545
  iname = instance.name
2546
  # With the two passes mechanism we try to reduce the window of
2547
  # opportunity for the race condition of switching DRBD to primary
2548
  # before handshaking occured, but we do not eliminate it
2549

    
2550
  # The proper fix would be to wait (with some limits) until the
2551
  # connection has been made and drbd transitions from WFConnection
2552
  # into any other network-connected state (Connected, SyncTarget,
2553
  # SyncSource, etc.)
2554

    
2555
  # 1st pass, assemble on all nodes in secondary mode
2556
  for inst_disk in instance.disks:
2557
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2558
      lu.cfg.SetDiskID(node_disk, node)
2559
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2560
      msg = result.RemoteFailMsg()
2561
      if msg:
2562
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2563
                           " (is_primary=False, pass=1): %s",
2564
                           inst_disk.iv_name, node, msg)
2565
        if not ignore_secondaries:
2566
          disks_ok = False
2567

    
2568
  # FIXME: race condition on drbd migration to primary
2569

    
2570
  # 2nd pass, do only the primary node
2571
  for inst_disk in instance.disks:
2572
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2573
      if node != instance.primary_node:
2574
        continue
2575
      lu.cfg.SetDiskID(node_disk, node)
2576
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2577
      msg = result.RemoteFailMsg()
2578
      if msg:
2579
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2580
                           " (is_primary=True, pass=2): %s",
2581
                           inst_disk.iv_name, node, msg)
2582
        disks_ok = False
2583
    device_info.append((instance.primary_node, inst_disk.iv_name,
2584
                        result.payload))
2585

    
2586
  # leave the disks configured for the primary node
2587
  # this is a workaround that would be fixed better by
2588
  # improving the logical/physical id handling
2589
  for disk in instance.disks:
2590
    lu.cfg.SetDiskID(disk, instance.primary_node)
2591

    
2592
  return disks_ok, device_info
2593

    
2594

    
2595
def _StartInstanceDisks(lu, instance, force):
2596
  """Start the disks of an instance.
2597

2598
  """
2599
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2600
                                           ignore_secondaries=force)
2601
  if not disks_ok:
2602
    _ShutdownInstanceDisks(lu, instance)
2603
    if force is not None and not force:
2604
      lu.proc.LogWarning("", hint="If the message above refers to a"
2605
                         " secondary node,"
2606
                         " you can retry the operation using '--force'.")
2607
    raise errors.OpExecError("Disk consistency error")
2608

    
2609

    
2610
class LUDeactivateInstanceDisks(NoHooksLU):
2611
  """Shutdown an instance's disks.
2612

2613
  """
2614
  _OP_REQP = ["instance_name"]
2615
  REQ_BGL = False
2616

    
2617
  def ExpandNames(self):
2618
    self._ExpandAndLockInstance()
2619
    self.needed_locks[locking.LEVEL_NODE] = []
2620
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2621

    
2622
  def DeclareLocks(self, level):
2623
    if level == locking.LEVEL_NODE:
2624
      self._LockInstancesNodes()
2625

    
2626
  def CheckPrereq(self):
2627
    """Check prerequisites.
2628

2629
    This checks that the instance is in the cluster.
2630

2631
    """
2632
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2633
    assert self.instance is not None, \
2634
      "Cannot retrieve locked instance %s" % self.op.instance_name
2635

    
2636
  def Exec(self, feedback_fn):
2637
    """Deactivate the disks
2638

2639
    """
2640
    instance = self.instance
2641
    _SafeShutdownInstanceDisks(self, instance)
2642

    
2643

    
2644
def _SafeShutdownInstanceDisks(lu, instance):
2645
  """Shutdown block devices of an instance.
2646

2647
  This function checks if an instance is running, before calling
2648
  _ShutdownInstanceDisks.
2649

2650
  """
2651
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2652
                                      [instance.hypervisor])
2653
  ins_l = ins_l[instance.primary_node]
2654
  if ins_l.failed or not isinstance(ins_l.data, list):
2655
    raise errors.OpExecError("Can't contact node '%s'" %
2656
                             instance.primary_node)
2657

    
2658
  if instance.name in ins_l.data:
2659
    raise errors.OpExecError("Instance is running, can't shutdown"
2660
                             " block devices.")
2661

    
2662
  _ShutdownInstanceDisks(lu, instance)
2663

    
2664

    
2665
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2666
  """Shutdown block devices of an instance.
2667

2668
  This does the shutdown on all nodes of the instance.
2669

2670
  If the ignore_primary is false, errors on the primary node are
2671
  ignored.
2672

2673
  """
2674
  all_result = True
2675
  for disk in instance.disks:
2676
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2677
      lu.cfg.SetDiskID(top_disk, node)
2678
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2679
      msg = result.RemoteFailMsg()
2680
      if msg:
2681
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2682
                      disk.iv_name, node, msg)
2683
        if not ignore_primary or node != instance.primary_node:
2684
          all_result = False
2685
  return all_result
2686

    
2687

    
2688
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2689
  """Checks if a node has enough free memory.
2690

2691
  This function check if a given node has the needed amount of free
2692
  memory. In case the node has less memory or we cannot get the
2693
  information from the node, this function raise an OpPrereqError
2694
  exception.
2695

2696
  @type lu: C{LogicalUnit}
2697
  @param lu: a logical unit from which we get configuration data
2698
  @type node: C{str}
2699
  @param node: the node to check
2700
  @type reason: C{str}
2701
  @param reason: string to use in the error message
2702
  @type requested: C{int}
2703
  @param requested: the amount of memory in MiB to check for
2704
  @type hypervisor_name: C{str}
2705
  @param hypervisor_name: the hypervisor to ask for memory stats
2706
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2707
      we cannot check the node
2708

2709
  """
2710
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2711
  nodeinfo[node].Raise()
2712
  free_mem = nodeinfo[node].data.get('memory_free')
2713
  if not isinstance(free_mem, int):
2714
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2715
                             " was '%s'" % (node, free_mem))
2716
  if requested > free_mem:
2717
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2718
                             " needed %s MiB, available %s MiB" %
2719
                             (node, reason, requested, free_mem))
2720

    
2721

    
2722
class LUStartupInstance(LogicalUnit):
2723
  """Starts an instance.
2724

2725
  """
2726
  HPATH = "instance-start"
2727
  HTYPE = constants.HTYPE_INSTANCE
2728
  _OP_REQP = ["instance_name", "force"]
2729
  REQ_BGL = False
2730

    
2731
  def ExpandNames(self):
2732
    self._ExpandAndLockInstance()
2733

    
2734
  def BuildHooksEnv(self):
2735
    """Build hooks env.
2736

2737
    This runs on master, primary and secondary nodes of the instance.
2738

2739
    """
2740
    env = {
2741
      "FORCE": self.op.force,
2742
      }
2743
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2744
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2745
    return env, nl, nl
2746

    
2747
  def CheckPrereq(self):
2748
    """Check prerequisites.
2749

2750
    This checks that the instance is in the cluster.
2751

2752
    """
2753
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2754
    assert self.instance is not None, \
2755
      "Cannot retrieve locked instance %s" % self.op.instance_name
2756

    
2757
    _CheckNodeOnline(self, instance.primary_node)
2758

    
2759
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2760
    # check bridges existance
2761
    _CheckInstanceBridgesExist(self, instance)
2762

    
2763
    _CheckNodeFreeMemory(self, instance.primary_node,
2764
                         "starting instance %s" % instance.name,
2765
                         bep[constants.BE_MEMORY], instance.hypervisor)
2766

    
2767
  def Exec(self, feedback_fn):
2768
    """Start the instance.
2769

2770
    """
2771
    instance = self.instance
2772
    force = self.op.force
2773

    
2774
    self.cfg.MarkInstanceUp(instance.name)
2775

    
2776
    node_current = instance.primary_node
2777

    
2778
    _StartInstanceDisks(self, instance, force)
2779

    
2780
    result = self.rpc.call_instance_start(node_current, instance)
2781
    msg = result.RemoteFailMsg()
2782
    if msg:
2783
      _ShutdownInstanceDisks(self, instance)
2784
      raise errors.OpExecError("Could not start instance: %s" % msg)
2785

    
2786

    
2787
class LURebootInstance(LogicalUnit):
2788
  """Reboot an instance.
2789

2790
  """
2791
  HPATH = "instance-reboot"
2792
  HTYPE = constants.HTYPE_INSTANCE
2793
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2794
  REQ_BGL = False
2795

    
2796
  def ExpandNames(self):
2797
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2798
                                   constants.INSTANCE_REBOOT_HARD,
2799
                                   constants.INSTANCE_REBOOT_FULL]:
2800
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2801
                                  (constants.INSTANCE_REBOOT_SOFT,
2802
                                   constants.INSTANCE_REBOOT_HARD,
2803
                                   constants.INSTANCE_REBOOT_FULL))
2804
    self._ExpandAndLockInstance()
2805

    
2806
  def BuildHooksEnv(self):
2807
    """Build hooks env.
2808

2809
    This runs on master, primary and secondary nodes of the instance.
2810

2811
    """
2812
    env = {
2813
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2814
      "REBOOT_TYPE": self.op.reboot_type,
2815
      }
2816
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2817
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2818
    return env, nl, nl
2819

    
2820
  def CheckPrereq(self):
2821
    """Check prerequisites.
2822

2823
    This checks that the instance is in the cluster.
2824

2825
    """
2826
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2827
    assert self.instance is not None, \
2828
      "Cannot retrieve locked instance %s" % self.op.instance_name
2829

    
2830
    _CheckNodeOnline(self, instance.primary_node)
2831

    
2832
    # check bridges existance
2833
    _CheckInstanceBridgesExist(self, instance)
2834

    
2835
  def Exec(self, feedback_fn):
2836
    """Reboot the instance.
2837

2838
    """
2839
    instance = self.instance
2840
    ignore_secondaries = self.op.ignore_secondaries
2841
    reboot_type = self.op.reboot_type
2842

    
2843
    node_current = instance.primary_node
2844

    
2845
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2846
                       constants.INSTANCE_REBOOT_HARD]:
2847
      for disk in instance.disks:
2848
        self.cfg.SetDiskID(disk, node_current)
2849
      result = self.rpc.call_instance_reboot(node_current, instance,
2850
                                             reboot_type)
2851
      msg = result.RemoteFailMsg()
2852
      if msg:
2853
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2854
    else:
2855
      result = self.rpc.call_instance_shutdown(node_current, instance)
2856
      msg = result.RemoteFailMsg()
2857
      if msg:
2858
        raise errors.OpExecError("Could not shutdown instance for"
2859
                                 " full reboot: %s" % msg)
2860
      _ShutdownInstanceDisks(self, instance)
2861
      _StartInstanceDisks(self, instance, ignore_secondaries)
2862
      result = self.rpc.call_instance_start(node_current, instance)
2863
      msg = result.RemoteFailMsg()
2864
      if msg:
2865
        _ShutdownInstanceDisks(self, instance)
2866
        raise errors.OpExecError("Could not start instance for"
2867
                                 " full reboot: %s" % msg)
2868

    
2869
    self.cfg.MarkInstanceUp(instance.name)
2870

    
2871

    
2872
class LUShutdownInstance(LogicalUnit):
2873
  """Shutdown an instance.
2874

2875
  """
2876
  HPATH = "instance-stop"
2877
  HTYPE = constants.HTYPE_INSTANCE
2878
  _OP_REQP = ["instance_name"]
2879
  REQ_BGL = False
2880

    
2881
  def ExpandNames(self):
2882
    self._ExpandAndLockInstance()
2883

    
2884
  def BuildHooksEnv(self):
2885
    """Build hooks env.
2886

2887
    This runs on master, primary and secondary nodes of the instance.
2888

2889
    """
2890
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2891
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2892
    return env, nl, nl
2893

    
2894
  def CheckPrereq(self):
2895
    """Check prerequisites.
2896

2897
    This checks that the instance is in the cluster.
2898

2899
    """
2900
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2901
    assert self.instance is not None, \
2902
      "Cannot retrieve locked instance %s" % self.op.instance_name
2903
    _CheckNodeOnline(self, self.instance.primary_node)
2904

    
2905
  def Exec(self, feedback_fn):
2906
    """Shutdown the instance.
2907

2908
    """
2909
    instance = self.instance
2910
    node_current = instance.primary_node
2911
    self.cfg.MarkInstanceDown(instance.name)
2912
    result = self.rpc.call_instance_shutdown(node_current, instance)
2913
    msg = result.RemoteFailMsg()
2914
    if msg:
2915
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2916

    
2917
    _ShutdownInstanceDisks(self, instance)
2918

    
2919

    
2920
class LUReinstallInstance(LogicalUnit):
2921
  """Reinstall an instance.
2922

2923
  """
2924
  HPATH = "instance-reinstall"
2925
  HTYPE = constants.HTYPE_INSTANCE
2926
  _OP_REQP = ["instance_name"]
2927
  REQ_BGL = False
2928

    
2929
  def ExpandNames(self):
2930
    self._ExpandAndLockInstance()
2931

    
2932
  def BuildHooksEnv(self):
2933
    """Build hooks env.
2934

2935
    This runs on master, primary and secondary nodes of the instance.
2936

2937
    """
2938
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2939
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2940
    return env, nl, nl
2941

    
2942
  def CheckPrereq(self):
2943
    """Check prerequisites.
2944

2945
    This checks that the instance is in the cluster and is not running.
2946

2947
    """
2948
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2949
    assert instance is not None, \
2950
      "Cannot retrieve locked instance %s" % self.op.instance_name
2951
    _CheckNodeOnline(self, instance.primary_node)
2952

    
2953
    if instance.disk_template == constants.DT_DISKLESS:
2954
      raise errors.OpPrereqError("Instance '%s' has no disks" %
2955
                                 self.op.instance_name)
2956
    if instance.admin_up:
2957
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2958
                                 self.op.instance_name)
2959
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2960
                                              instance.name,
2961
                                              instance.hypervisor)
2962
    if remote_info.failed or remote_info.data:
2963
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2964
                                 (self.op.instance_name,
2965
                                  instance.primary_node))
2966

    
2967
    self.op.os_type = getattr(self.op, "os_type", None)
2968
    if self.op.os_type is not None:
2969
      # OS verification
2970
      pnode = self.cfg.GetNodeInfo(
2971
        self.cfg.ExpandNodeName(instance.primary_node))
2972
      if pnode is None:
2973
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
2974
                                   self.op.pnode)
2975
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
2976
      result.Raise()
2977
      if not isinstance(result.data, objects.OS):
2978
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2979
                                   " primary node"  % self.op.os_type)
2980

    
2981
    self.instance = instance
2982

    
2983
  def Exec(self, feedback_fn):
2984
    """Reinstall the instance.
2985

2986
    """
2987
    inst = self.instance
2988

    
2989
    if self.op.os_type is not None:
2990
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2991
      inst.os = self.op.os_type
2992
      self.cfg.Update(inst)
2993

    
2994
    _StartInstanceDisks(self, inst, None)
2995
    try:
2996
      feedback_fn("Running the instance OS create scripts...")
2997
      result = self.rpc.call_instance_os_add(inst.primary_node, inst)
2998
      msg = result.RemoteFailMsg()
2999
      if msg:
3000
        raise errors.OpExecError("Could not install OS for instance %s"
3001
                                 " on node %s: %s" %
3002
                                 (inst.name, inst.primary_node, msg))
3003
    finally:
3004
      _ShutdownInstanceDisks(self, inst)
3005

    
3006

    
3007
class LURenameInstance(LogicalUnit):
3008
  """Rename an instance.
3009

3010
  """
3011
  HPATH = "instance-rename"
3012
  HTYPE = constants.HTYPE_INSTANCE
3013
  _OP_REQP = ["instance_name", "new_name"]
3014

    
3015
  def BuildHooksEnv(self):
3016
    """Build hooks env.
3017

3018
    This runs on master, primary and secondary nodes of the instance.
3019

3020
    """
3021
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3022
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3023
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3024
    return env, nl, nl
3025

    
3026
  def CheckPrereq(self):
3027
    """Check prerequisites.
3028

3029
    This checks that the instance is in the cluster and is not running.
3030

3031
    """
3032
    instance = self.cfg.GetInstanceInfo(
3033
      self.cfg.ExpandInstanceName(self.op.instance_name))
3034
    if instance is None:
3035
      raise errors.OpPrereqError("Instance '%s' not known" %
3036
                                 self.op.instance_name)
3037
    _CheckNodeOnline(self, instance.primary_node)
3038

    
3039
    if instance.admin_up:
3040
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3041
                                 self.op.instance_name)
3042
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3043
                                              instance.name,
3044
                                              instance.hypervisor)
3045
    remote_info.Raise()
3046
    if remote_info.data:
3047
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3048
                                 (self.op.instance_name,
3049
                                  instance.primary_node))
3050
    self.instance = instance
3051

    
3052
    # new name verification
3053
    name_info = utils.HostInfo(self.op.new_name)
3054

    
3055
    self.op.new_name = new_name = name_info.name
3056
    instance_list = self.cfg.GetInstanceList()
3057
    if new_name in instance_list:
3058
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3059
                                 new_name)
3060

    
3061
    if not getattr(self.op, "ignore_ip", False):
3062
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3063
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3064
                                   (name_info.ip, new_name))
3065

    
3066

    
3067
  def Exec(self, feedback_fn):
3068
    """Reinstall the instance.
3069

3070
    """
3071
    inst = self.instance
3072
    old_name = inst.name
3073

    
3074
    if inst.disk_template == constants.DT_FILE:
3075
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3076

    
3077
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3078
    # Change the instance lock. This is definitely safe while we hold the BGL
3079
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3080
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3081

    
3082
    # re-read the instance from the configuration after rename
3083
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3084

    
3085
    if inst.disk_template == constants.DT_FILE:
3086
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3087
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3088
                                                     old_file_storage_dir,
3089
                                                     new_file_storage_dir)
3090
      result.Raise()
3091
      if not result.data:
3092
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3093
                                 " directory '%s' to '%s' (but the instance"
3094
                                 " has been renamed in Ganeti)" % (
3095
                                 inst.primary_node, old_file_storage_dir,
3096
                                 new_file_storage_dir))
3097

    
3098
      if not result.data[0]:
3099
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3100
                                 " (but the instance has been renamed in"
3101
                                 " Ganeti)" % (old_file_storage_dir,
3102
                                               new_file_storage_dir))
3103

    
3104
    _StartInstanceDisks(self, inst, None)
3105
    try:
3106
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3107
                                                 old_name)
3108
      msg = result.RemoteFailMsg()
3109
      if msg:
3110
        msg = ("Could not run OS rename script for instance %s on node %s"
3111
               " (but the instance has been renamed in Ganeti): %s" %
3112
               (inst.name, inst.primary_node, msg))
3113
        self.proc.LogWarning(msg)
3114
    finally:
3115
      _ShutdownInstanceDisks(self, inst)
3116

    
3117

    
3118
class LURemoveInstance(LogicalUnit):
3119
  """Remove an instance.
3120

3121
  """
3122
  HPATH = "instance-remove"
3123
  HTYPE = constants.HTYPE_INSTANCE
3124
  _OP_REQP = ["instance_name", "ignore_failures"]
3125
  REQ_BGL = False
3126

    
3127
  def ExpandNames(self):
3128
    self._ExpandAndLockInstance()
3129
    self.needed_locks[locking.LEVEL_NODE] = []
3130
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3131

    
3132
  def DeclareLocks(self, level):
3133
    if level == locking.LEVEL_NODE:
3134
      self._LockInstancesNodes()
3135

    
3136
  def BuildHooksEnv(self):
3137
    """Build hooks env.
3138

3139
    This runs on master, primary and secondary nodes of the instance.
3140

3141
    """
3142
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3143
    nl = [self.cfg.GetMasterNode()]
3144
    return env, nl, nl
3145

    
3146
  def CheckPrereq(self):
3147
    """Check prerequisites.
3148

3149
    This checks that the instance is in the cluster.
3150

3151
    """
3152
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3153
    assert self.instance is not None, \
3154
      "Cannot retrieve locked instance %s" % self.op.instance_name
3155

    
3156
  def Exec(self, feedback_fn):
3157
    """Remove the instance.
3158

3159
    """
3160
    instance = self.instance
3161
    logging.info("Shutting down instance %s on node %s",
3162
                 instance.name, instance.primary_node)
3163

    
3164
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3165
    msg = result.RemoteFailMsg()
3166
    if msg:
3167
      if self.op.ignore_failures:
3168
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3169
      else:
3170
        raise errors.OpExecError("Could not shutdown instance %s on"
3171
                                 " node %s: %s" %
3172
                                 (instance.name, instance.primary_node, msg))
3173

    
3174
    logging.info("Removing block devices for instance %s", instance.name)
3175

    
3176
    if not _RemoveDisks(self, instance):
3177
      if self.op.ignore_failures:
3178
        feedback_fn("Warning: can't remove instance's disks")
3179
      else:
3180
        raise errors.OpExecError("Can't remove instance's disks")
3181

    
3182
    logging.info("Removing instance %s out of cluster config", instance.name)
3183

    
3184
    self.cfg.RemoveInstance(instance.name)
3185
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3186

    
3187

    
3188
class LUQueryInstances(NoHooksLU):
3189
  """Logical unit for querying instances.
3190

3191
  """
3192
  _OP_REQP = ["output_fields", "names", "use_locking"]
3193
  REQ_BGL = False
3194
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3195
                                    "admin_state",
3196
                                    "disk_template", "ip", "mac", "bridge",
3197
                                    "sda_size", "sdb_size", "vcpus", "tags",
3198
                                    "network_port", "beparams",
3199
                                    r"(disk)\.(size)/([0-9]+)",
3200
                                    r"(disk)\.(sizes)", "disk_usage",
3201
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3202
                                    r"(nic)\.(macs|ips|bridges)",
3203
                                    r"(disk|nic)\.(count)",
3204
                                    "serial_no", "hypervisor", "hvparams",] +
3205
                                  ["hv/%s" % name
3206
                                   for name in constants.HVS_PARAMETERS] +
3207
                                  ["be/%s" % name
3208
                                   for name in constants.BES_PARAMETERS])
3209
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3210

    
3211

    
3212
  def ExpandNames(self):
3213
    _CheckOutputFields(static=self._FIELDS_STATIC,
3214
                       dynamic=self._FIELDS_DYNAMIC,
3215
                       selected=self.op.output_fields)
3216

    
3217
    self.needed_locks = {}
3218
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3219
    self.share_locks[locking.LEVEL_NODE] = 1
3220

    
3221
    if self.op.names:
3222
      self.wanted = _GetWantedInstances(self, self.op.names)
3223
    else:
3224
      self.wanted = locking.ALL_SET
3225

    
3226
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3227
    self.do_locking = self.do_node_query and self.op.use_locking
3228
    if self.do_locking:
3229
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3230
      self.needed_locks[locking.LEVEL_NODE] = []
3231
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3232

    
3233
  def DeclareLocks(self, level):
3234
    if level == locking.LEVEL_NODE and self.do_locking:
3235
      self._LockInstancesNodes()
3236

    
3237
  def CheckPrereq(self):
3238
    """Check prerequisites.
3239

3240
    """
3241
    pass
3242

    
3243
  def Exec(self, feedback_fn):
3244
    """Computes the list of nodes and their attributes.
3245

3246
    """
3247
    all_info = self.cfg.GetAllInstancesInfo()
3248
    if self.wanted == locking.ALL_SET:
3249
      # caller didn't specify instance names, so ordering is not important
3250
      if self.do_locking:
3251
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3252
      else:
3253
        instance_names = all_info.keys()
3254
      instance_names = utils.NiceSort(instance_names)
3255
    else:
3256
      # caller did specify names, so we must keep the ordering
3257
      if self.do_locking:
3258
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3259
      else:
3260
        tgt_set = all_info.keys()
3261
      missing = set(self.wanted).difference(tgt_set)
3262
      if missing:
3263
        raise errors.OpExecError("Some instances were removed before"
3264
                                 " retrieving their data: %s" % missing)
3265
      instance_names = self.wanted
3266

    
3267
    instance_list = [all_info[iname] for iname in instance_names]
3268

    
3269
    # begin data gathering
3270

    
3271
    nodes = frozenset([inst.primary_node for inst in instance_list])
3272
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3273

    
3274
    bad_nodes = []
3275
    off_nodes = []
3276
    if self.do_node_query:
3277
      live_data = {}
3278
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3279
      for name in nodes:
3280
        result = node_data[name]
3281
        if result.offline:
3282
          # offline nodes will be in both lists
3283
          off_nodes.append(name)
3284
        if result.failed:
3285
          bad_nodes.append(name)
3286
        else:
3287
          if result.data:
3288
            live_data.update(result.data)
3289
            # else no instance is alive
3290
    else:
3291
      live_data = dict([(name, {}) for name in instance_names])
3292

    
3293
    # end data gathering
3294

    
3295
    HVPREFIX = "hv/"
3296
    BEPREFIX = "be/"
3297
    output = []
3298
    for instance in instance_list:
3299
      iout = []
3300
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3301
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3302
      for field in self.op.output_fields:
3303
        st_match = self._FIELDS_STATIC.Matches(field)
3304
        if field == "name":
3305
          val = instance.name
3306
        elif field == "os":
3307
          val = instance.os
3308
        elif field == "pnode":
3309
          val = instance.primary_node
3310
        elif field == "snodes":
3311
          val = list(instance.secondary_nodes)
3312
        elif field == "admin_state":
3313
          val = instance.admin_up
3314
        elif field == "oper_state":
3315
          if instance.primary_node in bad_nodes:
3316
            val = None
3317
          else:
3318
            val = bool(live_data.get(instance.name))
3319
        elif field == "status":
3320
          if instance.primary_node in off_nodes:
3321
            val = "ERROR_nodeoffline"
3322
          elif instance.primary_node in bad_nodes:
3323
            val = "ERROR_nodedown"
3324
          else:
3325
            running = bool(live_data.get(instance.name))
3326
            if running:
3327
              if instance.admin_up:
3328
                val = "running"
3329
              else:
3330
                val = "ERROR_up"
3331
            else:
3332
              if instance.admin_up:
3333
                val = "ERROR_down"
3334
              else:
3335
                val = "ADMIN_down"
3336
        elif field == "oper_ram":
3337
          if instance.primary_node in bad_nodes:
3338
            val = None
3339
          elif instance.name in live_data:
3340
            val = live_data[instance.name].get("memory", "?")
3341
          else:
3342
            val = "-"
3343
        elif field == "disk_template":
3344
          val = instance.disk_template
3345
        elif field == "ip":
3346
          val = instance.nics[0].ip
3347
        elif field == "bridge":
3348
          val = instance.nics[0].bridge
3349
        elif field == "mac":
3350
          val = instance.nics[0].mac
3351
        elif field == "sda_size" or field == "sdb_size":
3352
          idx = ord(field[2]) - ord('a')
3353
          try:
3354
            val = instance.FindDisk(idx).size
3355
          except errors.OpPrereqError:
3356
            val = None
3357
        elif field == "disk_usage": # total disk usage per node
3358
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3359
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3360
        elif field == "tags":
3361
          val = list(instance.GetTags())
3362
        elif field == "serial_no":
3363
          val = instance.serial_no
3364
        elif field == "network_port":
3365
          val = instance.network_port
3366
        elif field == "hypervisor":
3367
          val = instance.hypervisor
3368
        elif field == "hvparams":
3369
          val = i_hv
3370
        elif (field.startswith(HVPREFIX) and
3371
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3372
          val = i_hv.get(field[len(HVPREFIX):], None)
3373
        elif field == "beparams":
3374
          val = i_be
3375
        elif (field.startswith(BEPREFIX) and
3376
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3377
          val = i_be.get(field[len(BEPREFIX):], None)
3378
        elif st_match and st_match.groups():
3379
          # matches a variable list
3380
          st_groups = st_match.groups()
3381
          if st_groups and st_groups[0] == "disk":
3382
            if st_groups[1] == "count":
3383
              val = len(instance.disks)
3384
            elif st_groups[1] == "sizes":
3385
              val = [disk.size for disk in instance.disks]
3386
            elif st_groups[1] == "size":
3387
              try:
3388
                val = instance.FindDisk(st_groups[2]).size
3389
              except errors.OpPrereqError:
3390
                val = None
3391
            else:
3392
              assert False, "Unhandled disk parameter"
3393
          elif st_groups[0] == "nic":
3394
            if st_groups[1] == "count":
3395
              val = len(instance.nics)
3396
            elif st_groups[1] == "macs":
3397
              val = [nic.mac for nic in instance.nics]
3398
            elif st_groups[1] == "ips":
3399
              val = [nic.ip for nic in instance.nics]
3400
            elif st_groups[1] == "bridges":
3401
              val = [nic.bridge for nic in instance.nics]
3402
            else:
3403
              # index-based item
3404
              nic_idx = int(st_groups[2])
3405
              if nic_idx >= len(instance.nics):
3406
                val = None
3407
              else:
3408
                if st_groups[1] == "mac":
3409
                  val = instance.nics[nic_idx].mac
3410
                elif st_groups[1] == "ip":
3411
                  val = instance.nics[nic_idx].ip
3412
                elif st_groups[1] == "bridge":
3413
                  val = instance.nics[nic_idx].bridge
3414
                else:
3415
                  assert False, "Unhandled NIC parameter"
3416
          else:
3417
            assert False, "Unhandled variable parameter"
3418
        else:
3419
          raise errors.ParameterError(field)
3420
        iout.append(val)
3421
      output.append(iout)
3422

    
3423
    return output
3424

    
3425

    
3426
class LUFailoverInstance(LogicalUnit):
3427
  """Failover an instance.
3428

3429
  """
3430
  HPATH = "instance-failover"
3431
  HTYPE = constants.HTYPE_INSTANCE
3432
  _OP_REQP = ["instance_name", "ignore_consistency"]
3433
  REQ_BGL = False
3434

    
3435
  def ExpandNames(self):
3436
    self._ExpandAndLockInstance()
3437
    self.needed_locks[locking.LEVEL_NODE] = []
3438
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3439

    
3440
  def DeclareLocks(self, level):
3441
    if level == locking.LEVEL_NODE:
3442
      self._LockInstancesNodes()
3443

    
3444
  def BuildHooksEnv(self):
3445
    """Build hooks env.
3446

3447
    This runs on master, primary and secondary nodes of the instance.
3448

3449
    """
3450
    env = {
3451
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3452
      }
3453
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3454
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3455
    return env, nl, nl
3456

    
3457
  def CheckPrereq(self):
3458
    """Check prerequisites.
3459

3460
    This checks that the instance is in the cluster.
3461

3462
    """
3463
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3464
    assert self.instance is not None, \
3465
      "Cannot retrieve locked instance %s" % self.op.instance_name
3466

    
3467
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3468
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3469
      raise errors.OpPrereqError("Instance's disk layout is not"
3470
                                 " network mirrored, cannot failover.")
3471

    
3472
    secondary_nodes = instance.secondary_nodes
3473
    if not secondary_nodes:
3474
      raise errors.ProgrammerError("no secondary node but using "
3475
                                   "a mirrored disk template")
3476

    
3477
    target_node = secondary_nodes[0]
3478
    _CheckNodeOnline(self, target_node)
3479
    _CheckNodeNotDrained(self, target_node)
3480
    # check memory requirements on the secondary node
3481
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3482
                         instance.name, bep[constants.BE_MEMORY],
3483
                         instance.hypervisor)
3484

    
3485
    # check bridge existance
3486
    brlist = [nic.bridge for nic in instance.nics]
3487
    result = self.rpc.call_bridges_exist(target_node, brlist)
3488
    result.Raise()
3489
    if not result.data:
3490
      raise errors.OpPrereqError("One or more target bridges %s does not"
3491
                                 " exist on destination node '%s'" %
3492
                                 (brlist, target_node))
3493

    
3494
  def Exec(self, feedback_fn):
3495
    """Failover an instance.
3496

3497
    The failover is done by shutting it down on its present node and
3498
    starting it on the secondary.
3499

3500
    """
3501
    instance = self.instance
3502

    
3503
    source_node = instance.primary_node
3504
    target_node = instance.secondary_nodes[0]
3505

    
3506
    feedback_fn("* checking disk consistency between source and target")
3507
    for dev in instance.disks:
3508
      # for drbd, these are drbd over lvm
3509
      if not _CheckDiskConsistency(self, dev, target_node, False):
3510
        if instance.admin_up and not self.op.ignore_consistency:
3511
          raise errors.OpExecError("Disk %s is degraded on target node,"
3512
                                   " aborting failover." % dev.iv_name)
3513

    
3514
    feedback_fn("* shutting down instance on source node")
3515
    logging.info("Shutting down instance %s on node %s",
3516
                 instance.name, source_node)
3517

    
3518
    result = self.rpc.call_instance_shutdown(source_node, instance)
3519
    msg = result.RemoteFailMsg()
3520
    if msg:
3521
      if self.op.ignore_consistency:
3522
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3523
                             " Proceeding anyway. Please make sure node"
3524
                             " %s is down. Error details: %s",
3525
                             instance.name, source_node, source_node, msg)
3526
      else:
3527
        raise errors.OpExecError("Could not shutdown instance %s on"
3528
                                 " node %s: %s" %
3529
                                 (instance.name, source_node, msg))
3530

    
3531
    feedback_fn("* deactivating the instance's disks on source node")
3532
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3533
      raise errors.OpExecError("Can't shut down the instance's disks.")
3534

    
3535
    instance.primary_node = target_node
3536
    # distribute new instance config to the other nodes
3537
    self.cfg.Update(instance)
3538

    
3539
    # Only start the instance if it's marked as up
3540
    if instance.admin_up:
3541
      feedback_fn("* activating the instance's disks on target node")
3542
      logging.info("Starting instance %s on node %s",
3543
                   instance.name, target_node)
3544

    
3545
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3546
                                               ignore_secondaries=True)
3547
      if not disks_ok:
3548
        _ShutdownInstanceDisks(self, instance)
3549
        raise errors.OpExecError("Can't activate the instance's disks")
3550

    
3551
      feedback_fn("* starting the instance on the target node")
3552
      result = self.rpc.call_instance_start(target_node, instance)
3553
      msg = result.RemoteFailMsg()
3554
      if msg:
3555
        _ShutdownInstanceDisks(self, instance)
3556
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3557
                                 (instance.name, target_node, msg))
3558

    
3559

    
3560
class LUMigrateInstance(LogicalUnit):
3561
  """Migrate an instance.
3562

3563
  This is migration without shutting down, compared to the failover,
3564
  which is done with shutdown.
3565

3566
  """
3567
  HPATH = "instance-migrate"
3568
  HTYPE = constants.HTYPE_INSTANCE
3569
  _OP_REQP = ["instance_name", "live", "cleanup"]
3570

    
3571
  REQ_BGL = False
3572

    
3573
  def ExpandNames(self):
3574
    self._ExpandAndLockInstance()
3575
    self.needed_locks[locking.LEVEL_NODE] = []
3576
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3577

    
3578
  def DeclareLocks(self, level):
3579
    if level == locking.LEVEL_NODE:
3580
      self._LockInstancesNodes()
3581

    
3582
  def BuildHooksEnv(self):
3583
    """Build hooks env.
3584

3585
    This runs on master, primary and secondary nodes of the instance.
3586

3587
    """
3588
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3589
    env["MIGRATE_LIVE"] = self.op.live
3590
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3591
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3592
    return env, nl, nl
3593

    
3594
  def CheckPrereq(self):
3595
    """Check prerequisites.
3596

3597
    This checks that the instance is in the cluster.
3598

3599
    """
3600
    instance = self.cfg.GetInstanceInfo(
3601
      self.cfg.ExpandInstanceName(self.op.instance_name))
3602
    if instance is None:
3603
      raise errors.OpPrereqError("Instance '%s' not known" %
3604
                                 self.op.instance_name)
3605

    
3606
    if instance.disk_template != constants.DT_DRBD8:
3607
      raise errors.OpPrereqError("Instance's disk layout is not"
3608
                                 " drbd8, cannot migrate.")
3609

    
3610
    secondary_nodes = instance.secondary_nodes
3611
    if not secondary_nodes:
3612
      raise errors.ConfigurationError("No secondary node but using"
3613
                                      " drbd8 disk template")
3614

    
3615
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3616

    
3617
    target_node = secondary_nodes[0]
3618
    # check memory requirements on the secondary node
3619
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3620
                         instance.name, i_be[constants.BE_MEMORY],
3621
                         instance.hypervisor)
3622

    
3623
    # check bridge existance
3624
    brlist = [nic.bridge for nic in instance.nics]
3625
    result = self.rpc.call_bridges_exist(target_node, brlist)
3626
    if result.failed or not result.data:
3627
      raise errors.OpPrereqError("One or more target bridges %s does not"
3628
                                 " exist on destination node '%s'" %
3629
                                 (brlist, target_node))
3630

    
3631
    if not self.op.cleanup:
3632
      _CheckNodeNotDrained(self, target_node)
3633
      result = self.rpc.call_instance_migratable(instance.primary_node,
3634
                                                 instance)
3635
      msg = result.RemoteFailMsg()
3636
      if msg:
3637
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3638
                                   msg)
3639

    
3640
    self.instance = instance
3641

    
3642
  def _WaitUntilSync(self):
3643
    """Poll with custom rpc for disk sync.
3644

3645
    This uses our own step-based rpc call.
3646

3647
    """
3648
    self.feedback_fn("* wait until resync is done")
3649
    all_done = False
3650
    while not all_done:
3651
      all_done = True
3652
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3653
                                            self.nodes_ip,
3654
                                            self.instance.disks)
3655
      min_percent = 100
3656
      for node, nres in result.items():
3657
        msg = nres.RemoteFailMsg()
3658
        if msg:
3659
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3660
                                   (node, msg))
3661
        node_done, node_percent = nres.payload
3662
        all_done = all_done and node_done
3663
        if node_percent is not None:
3664
          min_percent = min(min_percent, node_percent)
3665
      if not all_done:
3666
        if min_percent < 100:
3667
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3668
        time.sleep(2)
3669

    
3670
  def _EnsureSecondary(self, node):
3671
    """Demote a node to secondary.
3672

3673
    """
3674
    self.feedback_fn("* switching node %s to secondary mode" % node)
3675

    
3676
    for dev in self.instance.disks:
3677
      self.cfg.SetDiskID(dev, node)
3678

    
3679
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3680
                                          self.instance.disks)
3681
    msg = result.RemoteFailMsg()
3682
    if msg:
3683
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3684
                               " error %s" % (node, msg))
3685

    
3686
  def _GoStandalone(self):
3687
    """Disconnect from the network.
3688

3689
    """
3690
    self.feedback_fn("* changing into standalone mode")
3691
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3692
                                               self.instance.disks)
3693
    for node, nres in result.items():
3694
      msg = nres.RemoteFailMsg()
3695
      if msg:
3696
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3697
                                 " error %s" % (node, msg))
3698

    
3699
  def _GoReconnect(self, multimaster):
3700
    """Reconnect to the network.
3701

3702
    """
3703
    if multimaster:
3704
      msg = "dual-master"
3705
    else:
3706
      msg = "single-master"
3707
    self.feedback_fn("* changing disks into %s mode" % msg)
3708
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3709
                                           self.instance.disks,
3710
                                           self.instance.name, multimaster)
3711
    for node, nres in result.items():
3712
      msg = nres.RemoteFailMsg()
3713
      if msg:
3714
        raise errors.OpExecError("Cannot change disks config on node %s,"
3715
                                 " error: %s" % (node, msg))
3716

    
3717
  def _ExecCleanup(self):
3718
    """Try to cleanup after a failed migration.
3719

3720
    The cleanup is done by:
3721
      - check that the instance is running only on one node
3722
        (and update the config if needed)
3723
      - change disks on its secondary node to secondary
3724
      - wait until disks are fully synchronized
3725
      - disconnect from the network
3726
      - change disks into single-master mode
3727
      - wait again until disks are fully synchronized
3728

3729
    """
3730
    instance = self.instance
3731
    target_node = self.target_node
3732
    source_node = self.source_node
3733

    
3734
    # check running on only one node
3735
    self.feedback_fn("* checking where the instance actually runs"
3736
                     " (if this hangs, the hypervisor might be in"
3737
                     " a bad state)")
3738
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3739
    for node, result in ins_l.items():
3740
      result.Raise()
3741
      if not isinstance(result.data, list):
3742
        raise errors.OpExecError("Can't contact node '%s'" % node)
3743

    
3744
    runningon_source = instance.name in ins_l[source_node].data
3745
    runningon_target = instance.name in ins_l[target_node].data
3746

    
3747
    if runningon_source and runningon_target:
3748
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3749
                               " or the hypervisor is confused. You will have"
3750
                               " to ensure manually that it runs only on one"
3751
                               " and restart this operation.")
3752

    
3753
    if not (runningon_source or runningon_target):
3754
      raise errors.OpExecError("Instance does not seem to be running at all."
3755
                               " In this case, it's safer to repair by"
3756
                               " running 'gnt-instance stop' to ensure disk"
3757
                               " shutdown, and then restarting it.")
3758

    
3759
    if runningon_target:
3760
      # the migration has actually succeeded, we need to update the config
3761
      self.feedback_fn("* instance running on secondary node (%s),"
3762
                       " updating config" % target_node)
3763
      instance.primary_node = target_node
3764
      self.cfg.Update(instance)
3765
      demoted_node = source_node
3766
    else:
3767
      self.feedback_fn("* instance confirmed to be running on its"
3768
                       " primary node (%s)" % source_node)
3769
      demoted_node = target_node
3770

    
3771
    self._EnsureSecondary(demoted_node)
3772
    try:
3773
      self._WaitUntilSync()
3774
    except errors.OpExecError:
3775
      # we ignore here errors, since if the device is standalone, it
3776
      # won't be able to sync
3777
      pass
3778
    self._GoStandalone()
3779
    self._GoReconnect(False)
3780
    self._WaitUntilSync()
3781

    
3782
    self.feedback_fn("* done")
3783

    
3784
  def _RevertDiskStatus(self):
3785
    """Try to revert the disk status after a failed migration.
3786

3787
    """
3788
    target_node = self.target_node
3789
    try:
3790
      self._EnsureSecondary(target_node)
3791
      self._GoStandalone()
3792
      self._GoReconnect(False)
3793
      self._WaitUntilSync()
3794
    except errors.OpExecError, err:
3795
      self.LogWarning("Migration failed and I can't reconnect the"
3796
                      " drives: error '%s'\n"
3797
                      "Please look and recover the instance status" %
3798
                      str(err))
3799

    
3800
  def _AbortMigration(self):
3801
    """Call the hypervisor code to abort a started migration.
3802

3803
    """
3804
    instance = self.instance
3805
    target_node = self.target_node
3806
    migration_info = self.migration_info
3807

    
3808
    abort_result = self.rpc.call_finalize_migration(target_node,
3809
                                                    instance,
3810
                                                    migration_info,
3811
                                                    False)
3812
    abort_msg = abort_result.RemoteFailMsg()
3813
    if abort_msg:
3814
      logging.error("Aborting migration failed on target node %s: %s" %
3815
                    (target_node, abort_msg))
3816
      # Don't raise an exception here, as we stil have to try to revert the
3817
      # disk status, even if this step failed.
3818

    
3819
  def _ExecMigration(self):
3820
    """Migrate an instance.
3821

3822
    The migrate is done by:
3823
      - change the disks into dual-master mode
3824
      - wait until disks are fully synchronized again
3825
      - migrate the instance
3826
      - change disks on the new secondary node (the old primary) to secondary
3827
      - wait until disks are fully synchronized
3828
      - change disks into single-master mode
3829

3830
    """
3831
    instance = self.instance
3832
    target_node = self.target_node
3833
    source_node = self.source_node
3834

    
3835
    self.feedback_fn("* checking disk consistency between source and target")
3836
    for dev in instance.disks:
3837
      if not _CheckDiskConsistency(self, dev, target_node, False):
3838
        raise errors.OpExecError("Disk %s is degraded or not fully"
3839
                                 " synchronized on target node,"
3840
                                 " aborting migrate." % dev.iv_name)
3841

    
3842
    # First get the migration information from the remote node
3843
    result = self.rpc.call_migration_info(source_node, instance)
3844
    msg = result.RemoteFailMsg()
3845
    if msg:
3846
      log_err = ("Failed fetching source migration information from %s: %s" %
3847
                 (source_node, msg))
3848
      logging.error(log_err)
3849
      raise errors.OpExecError(log_err)
3850

    
3851
    self.migration_info = migration_info = result.payload
3852

    
3853
    # Then switch the disks to master/master mode
3854
    self._EnsureSecondary(target_node)
3855
    self._GoStandalone()
3856
    self._GoReconnect(True)
3857
    self._WaitUntilSync()
3858

    
3859
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3860
    result = self.rpc.call_accept_instance(target_node,
3861
                                           instance,
3862
                                           migration_info,
3863
                                           self.nodes_ip[target_node])
3864

    
3865
    msg = result.RemoteFailMsg()
3866
    if msg:
3867
      logging.error("Instance pre-migration failed, trying to revert"
3868
                    " disk status: %s", msg)
3869
      self._AbortMigration()
3870
      self._RevertDiskStatus()
3871
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3872
                               (instance.name, msg))
3873

    
3874
    self.feedback_fn("* migrating instance to %s" % target_node)
3875
    time.sleep(10)
3876
    result = self.rpc.call_instance_migrate(source_node, instance,
3877
                                            self.nodes_ip[target_node],
3878
                                            self.op.live)
3879
    msg = result.RemoteFailMsg()
3880
    if msg:
3881
      logging.error("Instance migration failed, trying to revert"
3882
                    " disk status: %s", msg)
3883
      self._AbortMigration()
3884
      self._RevertDiskStatus()
3885
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3886
                               (instance.name, msg))
3887
    time.sleep(10)
3888

    
3889
    instance.primary_node = target_node
3890
    # distribute new instance config to the other nodes
3891
    self.cfg.Update(instance)
3892

    
3893
    result = self.rpc.call_finalize_migration(target_node,
3894
                                              instance,
3895
                                              migration_info,
3896
                                              True)
3897
    msg = result.RemoteFailMsg()
3898
    if msg:
3899
      logging.error("Instance migration succeeded, but finalization failed:"
3900
                    " %s" % msg)
3901
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3902
                               msg)
3903

    
3904
    self._EnsureSecondary(source_node)
3905
    self._WaitUntilSync()
3906
    self._GoStandalone()
3907
    self._GoReconnect(False)
3908
    self._WaitUntilSync()
3909

    
3910
    self.feedback_fn("* done")
3911

    
3912
  def Exec(self, feedback_fn):
3913
    """Perform the migration.
3914

3915
    """
3916
    self.feedback_fn = feedback_fn
3917

    
3918
    self.source_node = self.instance.primary_node
3919
    self.target_node = self.instance.secondary_nodes[0]
3920
    self.all_nodes = [self.source_node, self.target_node]
3921
    self.nodes_ip = {
3922
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3923
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3924
      }
3925
    if self.op.cleanup:
3926
      return self._ExecCleanup()
3927
    else:
3928
      return self._ExecMigration()
3929

    
3930

    
3931
def _CreateBlockDev(lu, node, instance, device, force_create,
3932
                    info, force_open):
3933
  """Create a tree of block devices on a given node.
3934

3935
  If this device type has to be created on secondaries, create it and
3936
  all its children.
3937

3938
  If not, just recurse to children keeping the same 'force' value.
3939

3940
  @param lu: the lu on whose behalf we execute
3941
  @param node: the node on which to create the device
3942
  @type instance: L{objects.Instance}
3943
  @param instance: the instance which owns the device
3944
  @type device: L{objects.Disk}
3945
  @param device: the device to create
3946
  @type force_create: boolean
3947
  @param force_create: whether to force creation of this device; this
3948
      will be change to True whenever we find a device which has
3949
      CreateOnSecondary() attribute
3950
  @param info: the extra 'metadata' we should attach to the device
3951
      (this will be represented as a LVM tag)
3952
  @type force_open: boolean
3953
  @param force_open: this parameter will be passes to the
3954
      L{backend.BlockdevCreate} function where it specifies
3955
      whether we run on primary or not, and it affects both
3956
      the child assembly and the device own Open() execution
3957

3958
  """
3959
  if device.CreateOnSecondary():
3960
    force_create = True
3961

    
3962
  if device.children:
3963
    for child in device.children:
3964
      _CreateBlockDev(lu, node, instance, child, force_create,
3965
                      info, force_open)
3966

    
3967
  if not force_create:
3968
    return
3969

    
3970
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
3971

    
3972

    
3973
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
3974
  """Create a single block device on a given node.
3975

3976
  This will not recurse over children of the device, so they must be
3977
  created in advance.
3978

3979
  @param lu: the lu on whose behalf we execute
3980
  @param node: the node on which to create the device
3981
  @type instance: L{objects.Instance}
3982
  @param instance: the instance which owns the device
3983
  @type device: L{objects.Disk}
3984
  @param device: the device to create
3985
  @param info: the extra 'metadata' we should attach to the device
3986
      (this will be represented as a LVM tag)
3987
  @type force_open: boolean
3988
  @param force_open: this parameter will be passes to the
3989
      L{backend.BlockdevCreate} function where it specifies
3990
      whether we run on primary or not, and it affects both
3991
      the child assembly and the device own Open() execution
3992

3993
  """
3994
  lu.cfg.SetDiskID(device, node)
3995
  result = lu.rpc.call_blockdev_create(node, device, device.size,
3996
                                       instance.name, force_open, info)
3997
  msg = result.RemoteFailMsg()
3998
  if msg:
3999
    raise errors.OpExecError("Can't create block device %s on"
4000
                             " node %s for instance %s: %s" %
4001
                             (device, node, instance.name, msg))
4002
  if device.physical_id is None:
4003
    device.physical_id = result.payload
4004

    
4005

    
4006
def _GenerateUniqueNames(lu, exts):
4007
  """Generate a suitable LV name.
4008

4009
  This will generate a logical volume name for the given instance.
4010

4011
  """
4012
  results = []
4013
  for val in exts:
4014
    new_id = lu.cfg.GenerateUniqueID()
4015
    results.append("%s%s" % (new_id, val))
4016
  return results
4017

    
4018

    
4019
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4020
                         p_minor, s_minor):
4021
  """Generate a drbd8 device complete with its children.
4022

4023
  """
4024
  port = lu.cfg.AllocatePort()
4025
  vgname = lu.cfg.GetVGName()
4026
  shared_secret = lu.cfg.GenerateDRBDSecret()
4027
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4028
                          logical_id=(vgname, names[0]))
4029
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4030
                          logical_id=(vgname, names[1]))
4031
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4032
                          logical_id=(primary, secondary, port,
4033
                                      p_minor, s_minor,
4034
                                      shared_secret),
4035
                          children=[dev_data, dev_meta],
4036
                          iv_name=iv_name)
4037
  return drbd_dev
4038

    
4039

    
4040
def _GenerateDiskTemplate(lu, template_name,
4041
                          instance_name, primary_node,
4042
                          secondary_nodes, disk_info,
4043
                          file_storage_dir, file_driver,
4044
                          base_index):
4045
  """Generate the entire disk layout for a given template type.
4046

4047
  """
4048
  #TODO: compute space requirements
4049

    
4050
  vgname = lu.cfg.GetVGName()
4051
  disk_count = len(disk_info)
4052
  disks = []
4053
  if template_name == constants.DT_DISKLESS:
4054
    pass
4055
  elif template_name == constants.DT_PLAIN:
4056
    if len(secondary_nodes) != 0:
4057
      raise errors.ProgrammerError("Wrong template configuration")
4058

    
4059
    names = _GenerateUniqueNames(lu, [".disk%d" % i
4060
                                      for i in range(disk_count)])
4061
    for idx, disk in enumerate(disk_info):
4062
      disk_index = idx + base_index
4063
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4064
                              logical_id=(vgname, names[idx]),
4065
                              iv_name="disk/%d" % disk_index,
4066
                              mode=disk["mode"])
4067
      disks.append(disk_dev)
4068
  elif template_name == constants.DT_DRBD8:
4069
    if len(secondary_nodes) != 1:
4070
      raise errors.ProgrammerError("Wrong template configuration")
4071
    remote_node = secondary_nodes[0]
4072
    minors = lu.cfg.AllocateDRBDMinor(
4073
      [primary_node, remote_node] * len(disk_info), instance_name)
4074

    
4075
    names = []
4076
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4077
                                               for i in range(disk_count)]):
4078
      names.append(lv_prefix + "_data")
4079
      names.append(lv_prefix + "_meta")
4080
    for idx, disk in enumerate(disk_info):
4081
      disk_index = idx + base_index
4082
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4083
                                      disk["size"], names[idx*2:idx*2+2],
4084
                                      "disk/%d" % disk_index,
4085
                                      minors[idx*2], minors[idx*2+1])
4086
      disk_dev.mode = disk["mode"]
4087
      disks.append(disk_dev)
4088
  elif template_name == constants.DT_FILE:
4089
    if len(secondary_nodes) != 0:
4090
      raise errors.ProgrammerError("Wrong template configuration")
4091

    
4092
    for idx, disk in enumerate(disk_info):
4093
      disk_index = idx + base_index
4094
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4095
                              iv_name="disk/%d" % disk_index,
4096
                              logical_id=(file_driver,
4097
                                          "%s/disk%d" % (file_storage_dir,
4098
                                                         disk_index)),
4099
                              mode=disk["mode"])
4100
      disks.append(disk_dev)
4101
  else:
4102
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4103
  return disks
4104

    
4105

    
4106
def _GetInstanceInfoText(instance):
4107
  """Compute that text that should be added to the disk's metadata.
4108

4109
  """
4110
  return "originstname+%s" % instance.name
4111

    
4112

    
4113
def _CreateDisks(lu, instance):
4114
  """Create all disks for an instance.
4115

4116
  This abstracts away some work from AddInstance.
4117

4118
  @type lu: L{LogicalUnit}
4119
  @param lu: the logical unit on whose behalf we execute
4120
  @type instance: L{objects.Instance}
4121
  @param instance: the instance whose disks we should create
4122
  @rtype: boolean
4123
  @return: the success of the creation
4124

4125
  """
4126
  info = _GetInstanceInfoText(instance)
4127
  pnode = instance.primary_node
4128

    
4129
  if instance.disk_template == constants.DT_FILE:
4130
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4131
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4132

    
4133
    if result.failed or not result.data:
4134
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4135

    
4136
    if not result.data[0]:
4137
      raise errors.OpExecError("Failed to create directory '%s'" %
4138
                               file_storage_dir)
4139

    
4140
  # Note: this needs to be kept in sync with adding of disks in
4141
  # LUSetInstanceParams
4142
  for device in instance.disks:
4143
    logging.info("Creating volume %s for instance %s",
4144
                 device.iv_name, instance.name)
4145
    #HARDCODE
4146
    for node in instance.all_nodes:
4147
      f_create = node == pnode
4148
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4149

    
4150

    
4151
def _RemoveDisks(lu, instance):
4152
  """Remove all disks for an instance.
4153

4154
  This abstracts away some work from `AddInstance()` and
4155
  `RemoveInstance()`. Note that in case some of the devices couldn't
4156
  be removed, the removal will continue with the other ones (compare
4157
  with `_CreateDisks()`).
4158

4159
  @type lu: L{LogicalUnit}
4160
  @param lu: the logical unit on whose behalf we execute
4161
  @type instance: L{objects.Instance}
4162
  @param instance: the instance whose disks we should remove
4163
  @rtype: boolean
4164
  @return: the success of the removal
4165

4166
  """
4167
  logging.info("Removing block devices for instance %s", instance.name)
4168

    
4169
  all_result = True
4170
  for device in instance.disks:
4171
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4172
      lu.cfg.SetDiskID(disk, node)
4173
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4174
      if msg:
4175
        lu.LogWarning("Could not remove block device %s on node %s,"
4176
                      " continuing anyway: %s", device.iv_name, node, msg)
4177
        all_result = False
4178

    
4179
  if instance.disk_template == constants.DT_FILE:
4180
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4181
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4182
                                                 file_storage_dir)
4183
    if result.failed or not result.data:
4184
      logging.error("Could not remove directory '%s'", file_storage_dir)
4185
      all_result = False
4186

    
4187
  return all_result
4188

    
4189

    
4190
def _ComputeDiskSize(disk_template, disks):
4191
  """Compute disk size requirements in the volume group
4192

4193
  """
4194
  # Required free disk space as a function of disk and swap space
4195
  req_size_dict = {
4196
    constants.DT_DISKLESS: None,
4197
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4198
    # 128 MB are added for drbd metadata for each disk
4199
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4200
    constants.DT_FILE: None,
4201
  }
4202

    
4203
  if disk_template not in req_size_dict:
4204
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4205
                                 " is unknown" %  disk_template)
4206

    
4207
  return req_size_dict[disk_template]
4208

    
4209

    
4210
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4211
  """Hypervisor parameter validation.
4212

4213
  This function abstract the hypervisor parameter validation to be
4214
  used in both instance create and instance modify.
4215

4216
  @type lu: L{LogicalUnit}
4217
  @param lu: the logical unit for which we check
4218
  @type nodenames: list
4219
  @param nodenames: the list of nodes on which we should check
4220
  @type hvname: string
4221
  @param hvname: the name of the hypervisor we should use
4222
  @type hvparams: dict
4223
  @param hvparams: the parameters which we need to check
4224
  @raise errors.OpPrereqError: if the parameters are not valid
4225

4226
  """
4227
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4228
                                                  hvname,
4229
                                                  hvparams)
4230
  for node in nodenames:
4231
    info = hvinfo[node]
4232
    if info.offline:
4233
      continue
4234
    msg = info.RemoteFailMsg()
4235
    if msg:
4236
      raise errors.OpPrereqError("Hypervisor parameter validation"
4237
                                 " failed on node %s: %s" % (node, msg))
4238

    
4239

    
4240
class LUCreateInstance(LogicalUnit):
4241
  """Create an instance.
4242

4243
  """
4244
  HPATH = "instance-add"
4245
  HTYPE = constants.HTYPE_INSTANCE
4246
  _OP_REQP = ["instance_name", "disks", "disk_template",
4247
              "mode", "start",
4248
              "wait_for_sync", "ip_check", "nics",
4249
              "hvparams", "beparams"]
4250
  REQ_BGL = False
4251

    
4252
  def _ExpandNode(self, node):
4253
    """Expands and checks one node name.
4254

4255
    """
4256
    node_full = self.cfg.ExpandNodeName(node)
4257
    if node_full is None:
4258
      raise errors.OpPrereqError("Unknown node %s" % node)
4259
    return node_full
4260

    
4261
  def ExpandNames(self):
4262
    """ExpandNames for CreateInstance.
4263

4264
    Figure out the right locks for instance creation.
4265

4266
    """
4267
    self.needed_locks = {}
4268

    
4269
    # set optional parameters to none if they don't exist
4270
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4271
      if not hasattr(self.op, attr):
4272
        setattr(self.op, attr, None)
4273

    
4274
    # cheap checks, mostly valid constants given
4275

    
4276
    # verify creation mode
4277
    if self.op.mode not in (constants.INSTANCE_CREATE,
4278
                            constants.INSTANCE_IMPORT):
4279
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4280
                                 self.op.mode)
4281

    
4282
    # disk template and mirror node verification
4283
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4284
      raise errors.OpPrereqError("Invalid disk template name")
4285

    
4286
    if self.op.hypervisor is None:
4287
      self.op.hypervisor = self.cfg.GetHypervisorType()
4288

    
4289
    cluster = self.cfg.GetClusterInfo()
4290
    enabled_hvs = cluster.enabled_hypervisors
4291
    if self.op.hypervisor not in enabled_hvs:
4292
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4293
                                 " cluster (%s)" % (self.op.hypervisor,
4294
                                  ",".join(enabled_hvs)))
4295

    
4296
    # check hypervisor parameter syntax (locally)
4297
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4298
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4299
                                  self.op.hvparams)
4300
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4301
    hv_type.CheckParameterSyntax(filled_hvp)
4302

    
4303
    # fill and remember the beparams dict
4304
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4305
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4306
                                    self.op.beparams)
4307

    
4308
    #### instance parameters check
4309

    
4310
    # instance name verification
4311
    hostname1 = utils.HostInfo(self.op.instance_name)
4312
    self.op.instance_name = instance_name = hostname1.name
4313

    
4314
    # this is just a preventive check, but someone might still add this
4315
    # instance in the meantime, and creation will fail at lock-add time
4316
    if instance_name in self.cfg.GetInstanceList():
4317
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4318
                                 instance_name)
4319

    
4320
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4321

    
4322
    # NIC buildup
4323
    self.nics = []
4324
    for nic in self.op.nics:
4325
      # ip validity checks
4326
      ip = nic.get("ip", None)
4327
      if ip is None or ip.lower() == "none":
4328
        nic_ip = None
4329
      elif ip.lower() == constants.VALUE_AUTO:
4330
        nic_ip = hostname1.ip
4331
      else:
4332
        if not utils.IsValidIP(ip):
4333
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4334
                                     " like a valid IP" % ip)
4335
        nic_ip = ip
4336

    
4337
      # MAC address verification
4338
      mac = nic.get("mac", constants.VALUE_AUTO)
4339
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4340
        if not utils.IsValidMac(mac.lower()):
4341
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4342
                                     mac)
4343
      # bridge verification
4344
      bridge = nic.get("bridge", None)
4345
      if bridge is None:
4346
        bridge = self.cfg.GetDefBridge()
4347
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4348

    
4349
    # disk checks/pre-build
4350
    self.disks = []
4351
    for disk in self.op.disks:
4352
      mode = disk.get("mode", constants.DISK_RDWR)
4353
      if mode not in constants.DISK_ACCESS_SET:
4354
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4355
                                   mode)
4356
      size = disk.get("size", None)
4357
      if size is None:
4358
        raise errors.OpPrereqError("Missing disk size")
4359
      try:
4360
        size = int(size)
4361
      except ValueError:
4362
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4363
      self.disks.append({"size": size, "mode": mode})
4364

    
4365
    # used in CheckPrereq for ip ping check
4366
    self.check_ip = hostname1.ip
4367

    
4368
    # file storage checks
4369
    if (self.op.file_driver and
4370
        not self.op.file_driver in constants.FILE_DRIVER):
4371
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4372
                                 self.op.file_driver)
4373

    
4374
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4375
      raise errors.OpPrereqError("File storage directory path not absolute")
4376

    
4377
    ### Node/iallocator related checks
4378
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4379
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4380
                                 " node must be given")
4381

    
4382
    if self.op.iallocator:
4383
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4384
    else:
4385
      self.op.pnode = self._ExpandNode(self.op.pnode)
4386
      nodelist = [self.op.pnode]
4387
      if self.op.snode is not None:
4388
        self.op.snode = self._ExpandNode(self.op.snode)
4389
        nodelist.append(self.op.snode)
4390
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4391

    
4392
    # in case of import lock the source node too
4393
    if self.op.mode == constants.INSTANCE_IMPORT:
4394
      src_node = getattr(self.op, "src_node", None)
4395
      src_path = getattr(self.op, "src_path", None)
4396

    
4397
      if src_path is None:
4398
        self.op.src_path = src_path = self.op.instance_name
4399

    
4400
      if src_node is None:
4401
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4402
        self.op.src_node = None
4403
        if os.path.isabs(src_path):
4404
          raise errors.OpPrereqError("Importing an instance from an absolute"
4405
                                     " path requires a source node option.")
4406
      else:
4407
        self.op.src_node = src_node = self._ExpandNode(src_node)
4408
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4409
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4410
        if not os.path.isabs(src_path):
4411
          self.op.src_path = src_path = \
4412
            os.path.join(constants.EXPORT_DIR, src_path)
4413

    
4414
    else: # INSTANCE_CREATE
4415
      if getattr(self.op, "os_type", None) is None:
4416
        raise errors.OpPrereqError("No guest OS specified")
4417

    
4418
  def _RunAllocator(self):
4419
    """Run the allocator based on input opcode.
4420

4421
    """
4422
    nics = [n.ToDict() for n in self.nics]
4423
    ial = IAllocator(self,
4424
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4425
                     name=self.op.instance_name,
4426
                     disk_template=self.op.disk_template,
4427
                     tags=[],
4428
                     os=self.op.os_type,
4429
                     vcpus=self.be_full[constants.BE_VCPUS],
4430
                     mem_size=self.be_full[constants.BE_MEMORY],
4431
                     disks=self.disks,
4432
                     nics=nics,
4433
                     hypervisor=self.op.hypervisor,
4434
                     )
4435

    
4436
    ial.Run(self.op.iallocator)
4437

    
4438
    if not ial.success:
4439
      raise errors.OpPrereqError("Can't compute nodes using"
4440
                                 " iallocator '%s': %s" % (self.op.iallocator,
4441
                                                           ial.info))
4442
    if len(ial.nodes) != ial.required_nodes:
4443
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4444
                                 " of nodes (%s), required %s" %
4445
                                 (self.op.iallocator, len(ial.nodes),
4446
                                  ial.required_nodes))
4447
    self.op.pnode = ial.nodes[0]
4448
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4449
                 self.op.instance_name, self.op.iallocator,
4450
                 ", ".join(ial.nodes))
4451
    if ial.required_nodes == 2:
4452
      self.op.snode = ial.nodes[1]
4453

    
4454
  def BuildHooksEnv(self):
4455
    """Build hooks env.
4456

4457
    This runs on master, primary and secondary nodes of the instance.
4458

4459
    """
4460
    env = {
4461
      "ADD_MODE": self.op.mode,
4462
      }
4463
    if self.op.mode == constants.INSTANCE_IMPORT:
4464
      env["SRC_NODE"] = self.op.src_node
4465
      env["SRC_PATH"] = self.op.src_path
4466
      env["SRC_IMAGES"] = self.src_images
4467

    
4468
    env.update(_BuildInstanceHookEnv(
4469
      name=self.op.instance_name,
4470
      primary_node=self.op.pnode,
4471
      secondary_nodes=self.secondaries,
4472
      status=self.op.start,
4473
      os_type=self.op.os_type,
4474
      memory=self.be_full[constants.BE_MEMORY],
4475
      vcpus=self.be_full[constants.BE_VCPUS],
4476
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4477
      disk_template=self.op.disk_template,
4478
      disks=[(d["size"], d["mode"]) for d in self.disks],
4479
    ))
4480

    
4481
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4482
          self.secondaries)
4483
    return env, nl, nl
4484

    
4485

    
4486
  def CheckPrereq(self):
4487
    """Check prerequisites.
4488

4489
    """
4490
    if (not self.cfg.GetVGName() and
4491
        self.op.disk_template not in constants.DTS_NOT_LVM):
4492
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4493
                                 " instances")
4494

    
4495
    if self.op.mode == constants.INSTANCE_IMPORT:
4496
      src_node = self.op.src_node
4497
      src_path = self.op.src_path
4498

    
4499
      if src_node is None:
4500
        exp_list = self.rpc.call_export_list(
4501
          self.acquired_locks[locking.LEVEL_NODE])
4502
        found = False
4503
        for node in exp_list:
4504
          if not exp_list[node].failed and src_path in exp_list[node].data:
4505
            found = True
4506
            self.op.src_node = src_node = node
4507
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4508
                                                       src_path)
4509
            break
4510
        if not found:
4511
          raise errors.OpPrereqError("No export found for relative path %s" %
4512
                                      src_path)
4513

    
4514
      _CheckNodeOnline(self, src_node)
4515
      result = self.rpc.call_export_info(src_node, src_path)
4516
      result.Raise()
4517
      if not result.data:
4518
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4519

    
4520
      export_info = result.data
4521
      if not export_info.has_section(constants.INISECT_EXP):
4522
        raise errors.ProgrammerError("Corrupted export config")
4523

    
4524
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4525
      if (int(ei_version) != constants.EXPORT_VERSION):
4526
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4527
                                   (ei_version, constants.EXPORT_VERSION))
4528

    
4529
      # Check that the new instance doesn't have less disks than the export
4530
      instance_disks = len(self.disks)
4531
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4532
      if instance_disks < export_disks:
4533
        raise errors.OpPrereqError("Not enough disks to import."
4534
                                   " (instance: %d, export: %d)" %
4535
                                   (instance_disks, export_disks))
4536

    
4537
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4538
      disk_images = []
4539
      for idx in range(export_disks):
4540
        option = 'disk%d_dump' % idx
4541
        if export_info.has_option(constants.INISECT_INS, option):
4542
          # FIXME: are the old os-es, disk sizes, etc. useful?
4543
          export_name = export_info.get(constants.INISECT_INS, option)
4544
          image = os.path.join(src_path, export_name)
4545
          disk_images.append(image)
4546
        else:
4547
          disk_images.append(False)
4548

    
4549
      self.src_images = disk_images
4550

    
4551
      old_name = export_info.get(constants.INISECT_INS, 'name')
4552
      # FIXME: int() here could throw a ValueError on broken exports
4553
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4554
      if self.op.instance_name == old_name:
4555
        for idx, nic in enumerate(self.nics):
4556
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4557
            nic_mac_ini = 'nic%d_mac' % idx
4558
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4559

    
4560
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4561
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4562
    if self.op.start and not self.op.ip_check:
4563
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4564
                                 " adding an instance in start mode")
4565

    
4566
    if self.op.ip_check:
4567
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4568
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4569
                                   (self.check_ip, self.op.instance_name))
4570

    
4571
    #### mac address generation
4572
    # By generating here the mac address both the allocator and the hooks get
4573
    # the real final mac address rather than the 'auto' or 'generate' value.
4574
    # There is a race condition between the generation and the instance object
4575
    # creation, which means that we know the mac is valid now, but we're not
4576
    # sure it will be when we actually add the instance. If things go bad
4577
    # adding the instance will abort because of a duplicate mac, and the
4578
    # creation job will fail.
4579
    for nic in self.nics:
4580
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4581
        nic.mac = self.cfg.GenerateMAC()
4582

    
4583
    #### allocator run
4584

    
4585
    if self.op.iallocator is not None:
4586
      self._RunAllocator()
4587

    
4588
    #### node related checks
4589

    
4590
    # check primary node
4591
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4592
    assert self.pnode is not None, \
4593
      "Cannot retrieve locked node %s" % self.op.pnode
4594
    if pnode.offline:
4595
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4596
                                 pnode.name)
4597
    if pnode.drained:
4598
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4599
                                 pnode.name)
4600

    
4601
    self.secondaries = []
4602

    
4603
    # mirror node verification
4604
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4605
      if self.op.snode is None:
4606
        raise errors.OpPrereqError("The networked disk templates need"
4607
                                   " a mirror node")
4608
      if self.op.snode == pnode.name:
4609
        raise errors.OpPrereqError("The secondary node cannot be"
4610
                                   " the primary node.")
4611
      _CheckNodeOnline(self, self.op.snode)
4612
      _CheckNodeNotDrained(self, self.op.snode)
4613
      self.secondaries.append(self.op.snode)
4614

    
4615
    nodenames = [pnode.name] + self.secondaries
4616

    
4617
    req_size = _ComputeDiskSize(self.op.disk_template,
4618
                                self.disks)
4619

    
4620
    # Check lv size requirements
4621
    if req_size is not None:
4622
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4623
                                         self.op.hypervisor)
4624
      for node in nodenames:
4625
        info = nodeinfo[node]
4626
        info.Raise()
4627
        info = info.data
4628
        if not info:
4629
          raise errors.OpPrereqError("Cannot get current information"
4630
                                     " from node '%s'" % node)
4631
        vg_free = info.get('vg_free', None)
4632
        if not isinstance(vg_free, int):
4633
          raise errors.OpPrereqError("Can't compute free disk space on"
4634
                                     " node %s" % node)
4635
        if req_size > info['vg_free']:
4636
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4637
                                     " %d MB available, %d MB required" %
4638
                                     (node, info['vg_free'], req_size))
4639

    
4640
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4641

    
4642
    # os verification
4643
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4644
    result.Raise()
4645
    if not isinstance(result.data, objects.OS):
4646
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4647
                                 " primary node"  % self.op.os_type)
4648

    
4649
    # bridge check on primary node
4650
    bridges = [n.bridge for n in self.nics]
4651
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4652
    result.Raise()
4653
    if not result.data:
4654
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4655
                                 " exist on destination node '%s'" %
4656
                                 (",".join(bridges), pnode.name))
4657

    
4658
    # memory check on primary node
4659
    if self.op.start:
4660
      _CheckNodeFreeMemory(self, self.pnode.name,
4661
                           "creating instance %s" % self.op.instance_name,
4662
                           self.be_full[constants.BE_MEMORY],
4663
                           self.op.hypervisor)
4664

    
4665
  def Exec(self, feedback_fn):
4666
    """Create and add the instance to the cluster.
4667

4668
    """
4669
    instance = self.op.instance_name
4670
    pnode_name = self.pnode.name
4671

    
4672
    ht_kind = self.op.hypervisor
4673
    if ht_kind in constants.HTS_REQ_PORT:
4674
      network_port = self.cfg.AllocatePort()
4675
    else:
4676
      network_port = None
4677

    
4678
    ##if self.op.vnc_bind_address is None:
4679
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4680

    
4681
    # this is needed because os.path.join does not accept None arguments
4682
    if self.op.file_storage_dir is None:
4683
      string_file_storage_dir = ""
4684
    else:
4685
      string_file_storage_dir = self.op.file_storage_dir
4686

    
4687
    # build the full file storage dir path
4688
    file_storage_dir = os.path.normpath(os.path.join(
4689
                                        self.cfg.GetFileStorageDir(),
4690
                                        string_file_storage_dir, instance))
4691

    
4692

    
4693
    disks = _GenerateDiskTemplate(self,
4694
                                  self.op.disk_template,
4695
                                  instance, pnode_name,
4696
                                  self.secondaries,
4697
                                  self.disks,
4698
                                  file_storage_dir,
4699
                                  self.op.file_driver,
4700
                                  0)
4701

    
4702
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4703
                            primary_node=pnode_name,
4704
                            nics=self.nics, disks=disks,
4705
                            disk_template=self.op.disk_template,
4706
                            admin_up=False,
4707
                            network_port=network_port,
4708
                            beparams=self.op.beparams,
4709
                            hvparams=self.op.hvparams,
4710
                            hypervisor=self.op.hypervisor,
4711
                            )
4712

    
4713
    feedback_fn("* creating instance disks...")
4714
    try:
4715
      _CreateDisks(self, iobj)
4716
    except errors.OpExecError:
4717
      self.LogWarning("Device creation failed, reverting...")
4718
      try:
4719
        _RemoveDisks(self, iobj)
4720
      finally:
4721
        self.cfg.ReleaseDRBDMinors(instance)
4722
        raise
4723

    
4724
    feedback_fn("adding instance %s to cluster config" % instance)
4725

    
4726
    self.cfg.AddInstance(iobj)
4727
    # Declare that we don't want to remove the instance lock anymore, as we've
4728
    # added the instance to the config
4729
    del self.remove_locks[locking.LEVEL_INSTANCE]
4730
    # Unlock all the nodes
4731
    if self.op.mode == constants.INSTANCE_IMPORT:
4732
      nodes_keep = [self.op.src_node]
4733
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4734
                       if node != self.op.src_node]
4735
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4736
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4737
    else:
4738
      self.context.glm.release(locking.LEVEL_NODE)
4739
      del self.acquired_locks[locking.LEVEL_NODE]
4740

    
4741
    if self.op.wait_for_sync:
4742
      disk_abort = not _WaitForSync(self, iobj)
4743
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4744
      # make sure the disks are not degraded (still sync-ing is ok)
4745
      time.sleep(15)
4746
      feedback_fn("* checking mirrors status")
4747
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4748
    else:
4749
      disk_abort = False
4750

    
4751
    if disk_abort:
4752
      _RemoveDisks(self, iobj)
4753
      self.cfg.RemoveInstance(iobj.name)
4754
      # Make sure the instance lock gets removed
4755
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4756
      raise errors.OpExecError("There are some degraded disks for"
4757
                               " this instance")
4758

    
4759
    feedback_fn("creating os for instance %s on node %s" %
4760
                (instance, pnode_name))
4761

    
4762
    if iobj.disk_template != constants.DT_DISKLESS:
4763
      if self.op.mode == constants.INSTANCE_CREATE:
4764
        feedback_fn("* running the instance OS create scripts...")
4765
        result = self.rpc.call_instance_os_add(pnode_name, iobj)
4766
        msg = result.RemoteFailMsg()
4767
        if msg:
4768
          raise errors.OpExecError("Could not add os for instance %s"
4769
                                   " on node %s: %s" %
4770
                                   (instance, pnode_name, msg))
4771

    
4772
      elif self.op.mode == constants.INSTANCE_IMPORT:
4773
        feedback_fn("* running the instance OS import scripts...")
4774
        src_node = self.op.src_node
4775
        src_images = self.src_images
4776
        cluster_name = self.cfg.GetClusterName()
4777
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4778
                                                         src_node, src_images,
4779
                                                         cluster_name)
4780
        import_result.Raise()
4781
        for idx, result in enumerate(import_result.data):
4782
          if not result:
4783
            self.LogWarning("Could not import the image %s for instance"
4784
                            " %s, disk %d, on node %s" %
4785
                            (src_images[idx], instance, idx, pnode_name))
4786
      else:
4787
        # also checked in the prereq part
4788
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4789
                                     % self.op.mode)
4790

    
4791
    if self.op.start:
4792
      iobj.admin_up = True
4793
      self.cfg.Update(iobj)
4794
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4795
      feedback_fn("* starting instance...")
4796
      result = self.rpc.call_instance_start(pnode_name, iobj)
4797
      msg = result.RemoteFailMsg()
4798
      if msg:
4799
        raise errors.OpExecError("Could not start instance: %s" % msg)
4800

    
4801

    
4802
class LUConnectConsole(NoHooksLU):
4803
  """Connect to an instance's console.
4804

4805
  This is somewhat special in that it returns the command line that
4806
  you need to run on the master node in order to connect to the
4807
  console.
4808

4809
  """
4810
  _OP_REQP = ["instance_name"]
4811
  REQ_BGL = False
4812

    
4813
  def ExpandNames(self):
4814
    self._ExpandAndLockInstance()
4815

    
4816
  def CheckPrereq(self):
4817
    """Check prerequisites.
4818

4819
    This checks that the instance is in the cluster.
4820

4821
    """
4822
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4823
    assert self.instance is not None, \
4824
      "Cannot retrieve locked instance %s" % self.op.instance_name
4825
    _CheckNodeOnline(self, self.instance.primary_node)
4826

    
4827
  def Exec(self, feedback_fn):
4828
    """Connect to the console of an instance
4829

4830
    """
4831
    instance = self.instance
4832
    node = instance.primary_node
4833

    
4834
    node_insts = self.rpc.call_instance_list([node],
4835
                                             [instance.hypervisor])[node]
4836
    node_insts.Raise()
4837

    
4838
    if instance.name not in node_insts.data:
4839
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4840

    
4841
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4842

    
4843
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4844
    cluster = self.cfg.GetClusterInfo()
4845
    # beparams and hvparams are passed separately, to avoid editing the
4846
    # instance and then saving the defaults in the instance itself.
4847
    hvparams = cluster.FillHV(instance)
4848
    beparams = cluster.FillBE(instance)
4849
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4850

    
4851
    # build ssh cmdline
4852
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4853

    
4854

    
4855
class LUReplaceDisks(LogicalUnit):
4856
  """Replace the disks of an instance.
4857

4858
  """
4859
  HPATH = "mirrors-replace"
4860
  HTYPE = constants.HTYPE_INSTANCE
4861
  _OP_REQP = ["instance_name", "mode", "disks"]
4862
  REQ_BGL = False
4863

    
4864
  def CheckArguments(self):
4865
    if not hasattr(self.op, "remote_node"):
4866
      self.op.remote_node = None
4867
    if not hasattr(self.op, "iallocator"):
4868
      self.op.iallocator = None
4869

    
4870
    # check for valid parameter combination
4871
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4872
    if self.op.mode == constants.REPLACE_DISK_CHG:
4873
      if cnt == 2:
4874
        raise errors.OpPrereqError("When changing the secondary either an"
4875
                                   " iallocator script must be used or the"
4876
                                   " new node given")
4877
      elif cnt == 0:
4878
        raise errors.OpPrereqError("Give either the iallocator or the new"
4879
                                   " secondary, not both")
4880
    else: # not replacing the secondary
4881
      if cnt != 2:
4882
        raise errors.OpPrereqError("The iallocator and new node options can"
4883
                                   " be used only when changing the"
4884
                                   " secondary node")
4885

    
4886
  def ExpandNames(self):
4887
    self._ExpandAndLockInstance()
4888

    
4889
    if self.op.iallocator is not None:
4890
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4891
    elif self.op.remote_node is not None:
4892
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4893
      if remote_node is None:
4894
        raise errors.OpPrereqError("Node '%s' not known" %
4895
                                   self.op.remote_node)
4896
      self.op.remote_node = remote_node
4897
      # Warning: do not remove the locking of the new secondary here
4898
      # unless DRBD8.AddChildren is changed to work in parallel;
4899
      # currently it doesn't since parallel invocations of
4900
      # FindUnusedMinor will conflict
4901
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4902
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4903
    else:
4904
      self.needed_locks[locking.LEVEL_NODE] = []
4905
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4906

    
4907
  def DeclareLocks(self, level):
4908
    # If we're not already locking all nodes in the set we have to declare the
4909
    # instance's primary/secondary nodes.
4910
    if (level == locking.LEVEL_NODE and
4911
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4912
      self._LockInstancesNodes()
4913

    
4914
  def _RunAllocator(self):
4915
    """Compute a new secondary node using an IAllocator.
4916

4917
    """
4918
    ial = IAllocator(self,
4919
                     mode=constants.IALLOCATOR_MODE_RELOC,
4920
                     name=self.op.instance_name,
4921
                     relocate_from=[self.sec_node])
4922

    
4923
    ial.Run(self.op.iallocator)
4924

    
4925
    if not ial.success:
4926
      raise errors.OpPrereqError("Can't compute nodes using"
4927
                                 " iallocator '%s': %s" % (self.op.iallocator,
4928
                                                           ial.info))
4929
    if len(ial.nodes) != ial.required_nodes:
4930
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4931
                                 " of nodes (%s), required %s" %
4932
                                 (len(ial.nodes), ial.required_nodes))
4933
    self.op.remote_node = ial.nodes[0]
4934
    self.LogInfo("Selected new secondary for the instance: %s",
4935
                 self.op.remote_node)
4936

    
4937
  def BuildHooksEnv(self):
4938
    """Build hooks env.
4939

4940
    This runs on the master, the primary and all the secondaries.
4941

4942
    """
4943
    env = {
4944
      "MODE": self.op.mode,
4945
      "NEW_SECONDARY": self.op.remote_node,
4946
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
4947
      }
4948
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4949
    nl = [
4950
      self.cfg.GetMasterNode(),
4951
      self.instance.primary_node,
4952
      ]
4953
    if self.op.remote_node is not None:
4954
      nl.append(self.op.remote_node)
4955
    return env, nl, nl
4956

    
4957
  def CheckPrereq(self):
4958
    """Check prerequisites.
4959

4960
    This checks that the instance is in the cluster.
4961

4962
    """
4963
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4964
    assert instance is not None, \
4965
      "Cannot retrieve locked instance %s" % self.op.instance_name
4966
    self.instance = instance
4967

    
4968
    if instance.disk_template != constants.DT_DRBD8:
4969
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
4970
                                 " instances")
4971

    
4972
    if len(instance.secondary_nodes) != 1:
4973
      raise errors.OpPrereqError("The instance has a strange layout,"
4974
                                 " expected one secondary but found %d" %
4975
                                 len(instance.secondary_nodes))
4976

    
4977
    self.sec_node = instance.secondary_nodes[0]
4978

    
4979
    if self.op.iallocator is not None:
4980
      self._RunAllocator()
4981

    
4982
    remote_node = self.op.remote_node
4983
    if remote_node is not None:
4984
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4985
      assert self.remote_node_info is not None, \
4986
        "Cannot retrieve locked node %s" % remote_node
4987
    else:
4988
      self.remote_node_info = None
4989
    if remote_node == instance.primary_node:
4990
      raise errors.OpPrereqError("The specified node is the primary node of"
4991
                                 " the instance.")
4992
    elif remote_node == self.sec_node:
4993
      raise errors.OpPrereqError("The specified node is already the"
4994
                                 " secondary node of the instance.")
4995

    
4996
    if self.op.mode == constants.REPLACE_DISK_PRI:
4997
      n1 = self.tgt_node = instance.primary_node
4998
      n2 = self.oth_node = self.sec_node
4999
    elif self.op.mode == constants.REPLACE_DISK_SEC:
5000
      n1 = self.tgt_node = self.sec_node
5001
      n2 = self.oth_node = instance.primary_node
5002
    elif self.op.mode == constants.REPLACE_DISK_CHG:
5003
      n1 = self.new_node = remote_node
5004
      n2 = self.oth_node = instance.primary_node
5005
      self.tgt_node = self.sec_node
5006
      _CheckNodeNotDrained(self, remote_node)
5007
    else:
5008
      raise errors.ProgrammerError("Unhandled disk replace mode")
5009

    
5010
    _CheckNodeOnline(self, n1)
5011
    _CheckNodeOnline(self, n2)
5012

    
5013
    if not self.op.disks:
5014
      self.op.disks = range(len(instance.disks))
5015

    
5016
    for disk_idx in self.op.disks:
5017
      instance.FindDisk(disk_idx)
5018

    
5019
  def _ExecD8DiskOnly(self, feedback_fn):
5020
    """Replace a disk on the primary or secondary for dbrd8.
5021

5022
    The algorithm for replace is quite complicated:
5023

5024
      1. for each disk to be replaced:
5025

5026
        1. create new LVs on the target node with unique names
5027
        1. detach old LVs from the drbd device
5028
        1. rename old LVs to name_replaced.<time_t>
5029
        1. rename new LVs to old LVs
5030
        1. attach the new LVs (with the old names now) to the drbd device
5031

5032
      1. wait for sync across all devices
5033

5034
      1. for each modified disk:
5035

5036
        1. remove old LVs (which have the name name_replaces.<time_t>)
5037

5038
    Failures are not very well handled.
5039

5040
    """
5041
    steps_total = 6
5042
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5043
    instance = self.instance
5044
    iv_names = {}
5045
    vgname = self.cfg.GetVGName()
5046
    # start of work
5047
    cfg = self.cfg
5048
    tgt_node = self.tgt_node
5049
    oth_node = self.oth_node
5050

    
5051
    # Step: check device activation
5052
    self.proc.LogStep(1, steps_total, "check device existence")
5053
    info("checking volume groups")
5054
    my_vg = cfg.GetVGName()
5055
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5056
    if not results:
5057
      raise errors.OpExecError("Can't list volume groups on the nodes")
5058
    for node in oth_node, tgt_node:
5059
      res = results[node]
5060
      if res.failed or not res.data or my_vg not in res.data:
5061
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5062
                                 (my_vg, node))
5063
    for idx, dev in enumerate(instance.disks):
5064
      if idx not in self.op.disks:
5065
        continue
5066
      for node in tgt_node, oth_node:
5067
        info("checking disk/%d on %s" % (idx, node))
5068
        cfg.SetDiskID(dev, node)
5069
        result = self.rpc.call_blockdev_find(node, dev)
5070
        msg = result.RemoteFailMsg()
5071
        if not msg and not result.payload:
5072
          msg = "disk not found"
5073
        if msg:
5074
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5075
                                   (idx, node, msg))
5076

    
5077
    # Step: check other node consistency
5078
    self.proc.LogStep(2, steps_total, "check peer consistency")
5079
    for idx, dev in enumerate(instance.disks):
5080
      if idx not in self.op.disks:
5081
        continue
5082
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5083
      if not _CheckDiskConsistency(self, dev, oth_node,
5084
                                   oth_node==instance.primary_node):
5085
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5086
                                 " to replace disks on this node (%s)" %
5087
                                 (oth_node, tgt_node))
5088

    
5089
    # Step: create new storage
5090
    self.proc.LogStep(3, steps_total, "allocate new storage")
5091
    for idx, dev in enumerate(instance.disks):
5092
      if idx not in self.op.disks:
5093
        continue
5094
      size = dev.size
5095
      cfg.SetDiskID(dev, tgt_node)
5096
      lv_names = [".disk%d_%s" % (idx, suf)
5097
                  for suf in ["data", "meta"]]
5098
      names = _GenerateUniqueNames(self, lv_names)
5099
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5100
                             logical_id=(vgname, names[0]))
5101
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5102
                             logical_id=(vgname, names[1]))
5103
      new_lvs = [lv_data, lv_meta]
5104
      old_lvs = dev.children
5105
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5106
      info("creating new local storage on %s for %s" %
5107
           (tgt_node, dev.iv_name))
5108
      # we pass force_create=True to force the LVM creation
5109
      for new_lv in new_lvs:
5110
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5111
                        _GetInstanceInfoText(instance), False)
5112

    
5113
    # Step: for each lv, detach+rename*2+attach
5114
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5115
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5116
      info("detaching %s drbd from local storage" % dev.iv_name)
5117
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5118
      result.Raise()
5119
      if not result.data:
5120
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5121
                                 " %s for device %s" % (tgt_node, dev.iv_name))
5122
      #dev.children = []
5123
      #cfg.Update(instance)
5124

    
5125
      # ok, we created the new LVs, so now we know we have the needed
5126
      # storage; as such, we proceed on the target node to rename
5127
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5128
      # using the assumption that logical_id == physical_id (which in
5129
      # turn is the unique_id on that node)
5130

    
5131
      # FIXME(iustin): use a better name for the replaced LVs
5132
      temp_suffix = int(time.time())
5133
      ren_fn = lambda d, suff: (d.physical_id[0],
5134
                                d.physical_id[1] + "_replaced-%s" % suff)
5135
      # build the rename list based on what LVs exist on the node
5136
      rlist = []
5137
      for to_ren in old_lvs:
5138
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5139
        if not result.RemoteFailMsg() and result.payload:
5140
          # device exists
5141
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5142

    
5143
      info("renaming the old LVs on the target node")
5144
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5145
      result.Raise()
5146
      if not result.data:
5147
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5148
      # now we rename the new LVs to the old LVs
5149
      info("renaming the new LVs on the target node")
5150
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5151
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5152
      result.Raise()
5153
      if not result.data:
5154
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5155

    
5156
      for old, new in zip(old_lvs, new_lvs):
5157
        new.logical_id = old.logical_id
5158
        cfg.SetDiskID(new, tgt_node)
5159

    
5160
      for disk in old_lvs:
5161
        disk.logical_id = ren_fn(disk, temp_suffix)
5162
        cfg.SetDiskID(disk, tgt_node)
5163

    
5164
      # now that the new lvs have the old name, we can add them to the device
5165
      info("adding new mirror component on %s" % tgt_node)
5166
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5167
      if result.failed or not result.data:
5168
        for new_lv in new_lvs:
5169
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5170
          if msg:
5171
            warning("Can't rollback device %s: %s", dev, msg,
5172
                    hint="cleanup manually the unused logical volumes")
5173
        raise errors.OpExecError("Can't add local storage to drbd")
5174

    
5175
      dev.children = new_lvs
5176
      cfg.Update(instance)
5177

    
5178
    # Step: wait for sync
5179

    
5180
    # this can fail as the old devices are degraded and _WaitForSync
5181
    # does a combined result over all disks, so we don't check its
5182
    # return value
5183
    self.proc.LogStep(5, steps_total, "sync devices")
5184
    _WaitForSync(self, instance, unlock=True)
5185

    
5186
    # so check manually all the devices
5187
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5188
      cfg.SetDiskID(dev, instance.primary_node)
5189
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5190
      msg = result.RemoteFailMsg()
5191
      if not msg and not result.payload:
5192
        msg = "disk not found"
5193
      if msg:
5194
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5195
                                 (name, msg))
5196
      if result.payload[5]:
5197
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5198

    
5199
    # Step: remove old storage
5200
    self.proc.LogStep(6, steps_total, "removing old storage")
5201
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5202
      info("remove logical volumes for %s" % name)
5203
      for lv in old_lvs:
5204
        cfg.SetDiskID(lv, tgt_node)
5205
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5206
        if msg:
5207
          warning("Can't remove old LV: %s" % msg,
5208
                  hint="manually remove unused LVs")
5209
          continue
5210

    
5211
  def _ExecD8Secondary(self, feedback_fn):
5212
    """Replace the secondary node for drbd8.
5213

5214
    The algorithm for replace is quite complicated:
5215
      - for all disks of the instance:
5216
        - create new LVs on the new node with same names
5217
        - shutdown the drbd device on the old secondary
5218
        - disconnect the drbd network on the primary
5219
        - create the drbd device on the new secondary
5220
        - network attach the drbd on the primary, using an artifice:
5221
          the drbd code for Attach() will connect to the network if it
5222
          finds a device which is connected to the good local disks but
5223
          not network enabled
5224
      - wait for sync across all devices
5225
      - remove all disks from the old secondary
5226

5227
    Failures are not very well handled.
5228

5229
    """
5230
    steps_total = 6
5231
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5232
    instance = self.instance
5233
    iv_names = {}
5234
    # start of work
5235
    cfg = self.cfg
5236
    old_node = self.tgt_node
5237
    new_node = self.new_node
5238
    pri_node = instance.primary_node
5239
    nodes_ip = {
5240
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5241
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5242
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5243
      }
5244

    
5245
    # Step: check device activation
5246
    self.proc.LogStep(1, steps_total, "check device existence")
5247
    info("checking volume groups")
5248
    my_vg = cfg.GetVGName()
5249
    results = self.rpc.call_vg_list([pri_node, new_node])
5250
    for node in pri_node, new_node:
5251
      res = results[node]
5252
      if res.failed or not res.data or my_vg not in res.data:
5253
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5254
                                 (my_vg, node))
5255
    for idx, dev in enumerate(instance.disks):
5256
      if idx not in self.op.disks:
5257
        continue
5258
      info("checking disk/%d on %s" % (idx, pri_node))
5259
      cfg.SetDiskID(dev, pri_node)
5260
      result = self.rpc.call_blockdev_find(pri_node, dev)
5261
      msg = result.RemoteFailMsg()
5262
      if not msg and not result.payload:
5263
        msg = "disk not found"
5264
      if msg:
5265
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5266
                                 (idx, pri_node, msg))
5267

    
5268
    # Step: check other node consistency
5269
    self.proc.LogStep(2, steps_total, "check peer consistency")
5270
    for idx, dev in enumerate(instance.disks):
5271
      if idx not in self.op.disks:
5272
        continue
5273
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5274
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5275
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5276
                                 " unsafe to replace the secondary" %
5277
                                 pri_node)
5278

    
5279
    # Step: create new storage
5280
    self.proc.LogStep(3, steps_total, "allocate new storage")
5281
    for idx, dev in enumerate(instance.disks):
5282
      info("adding new local storage on %s for disk/%d" %
5283
           (new_node, idx))
5284
      # we pass force_create=True to force LVM creation
5285
      for new_lv in dev.children:
5286
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5287
                        _GetInstanceInfoText(instance), False)
5288

    
5289
    # Step 4: dbrd minors and drbd setups changes
5290
    # after this, we must manually remove the drbd minors on both the
5291
    # error and the success paths
5292
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5293
                                   instance.name)
5294
    logging.debug("Allocated minors %s" % (minors,))
5295
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5296
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5297
      size = dev.size
5298
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5299
      # create new devices on new_node; note that we create two IDs:
5300
      # one without port, so the drbd will be activated without
5301
      # networking information on the new node at this stage, and one
5302
      # with network, for the latter activation in step 4
5303
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5304
      if pri_node == o_node1:
5305
        p_minor = o_minor1
5306
      else:
5307
        p_minor = o_minor2
5308

    
5309
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5310
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5311

    
5312
      iv_names[idx] = (dev, dev.children, new_net_id)
5313
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5314
                    new_net_id)
5315
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5316
                              logical_id=new_alone_id,
5317
                              children=dev.children)
5318
      try:
5319
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5320
                              _GetInstanceInfoText(instance), False)
5321
      except errors.GenericError:
5322
        self.cfg.ReleaseDRBDMinors(instance.name)
5323
        raise
5324

    
5325
    for idx, dev in enumerate(instance.disks):
5326
      # we have new devices, shutdown the drbd on the old secondary
5327
      info("shutting down drbd for disk/%d on old node" % idx)
5328
      cfg.SetDiskID(dev, old_node)
5329
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5330
      if msg:
5331
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5332
                (idx, msg),
5333
                hint="Please cleanup this device manually as soon as possible")
5334

    
5335
    info("detaching primary drbds from the network (=> standalone)")
5336
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5337
                                               instance.disks)[pri_node]
5338

    
5339
    msg = result.RemoteFailMsg()
5340
    if msg:
5341
      # detaches didn't succeed (unlikely)
5342
      self.cfg.ReleaseDRBDMinors(instance.name)
5343
      raise errors.OpExecError("Can't detach the disks from the network on"
5344
                               " old node: %s" % (msg,))
5345

    
5346
    # if we managed to detach at least one, we update all the disks of
5347
    # the instance to point to the new secondary
5348
    info("updating instance configuration")
5349
    for dev, _, new_logical_id in iv_names.itervalues():
5350
      dev.logical_id = new_logical_id
5351
      cfg.SetDiskID(dev, pri_node)
5352
    cfg.Update(instance)
5353

    
5354
    # and now perform the drbd attach
5355
    info("attaching primary drbds to new secondary (standalone => connected)")
5356
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5357
                                           instance.disks, instance.name,
5358
                                           False)
5359
    for to_node, to_result in result.items():
5360
      msg = to_result.RemoteFailMsg()
5361
      if msg:
5362
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5363
                hint="please do a gnt-instance info to see the"
5364
                " status of disks")
5365

    
5366
    # this can fail as the old devices are degraded and _WaitForSync
5367
    # does a combined result over all disks, so we don't check its
5368
    # return value
5369
    self.proc.LogStep(5, steps_total, "sync devices")
5370
    _WaitForSync(self, instance, unlock=True)
5371

    
5372
    # so check manually all the devices
5373
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5374
      cfg.SetDiskID(dev, pri_node)
5375
      result = self.rpc.call_blockdev_find(pri_node, dev)
5376
      msg = result.RemoteFailMsg()
5377
      if not msg and not result.payload:
5378
        msg = "disk not found"
5379
      if msg:
5380
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5381
                                 (idx, msg))
5382
      if result.payload[5]:
5383
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5384

    
5385
    self.proc.LogStep(6, steps_total, "removing old storage")
5386
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5387
      info("remove logical volumes for disk/%d" % idx)
5388
      for lv in old_lvs:
5389
        cfg.SetDiskID(lv, old_node)
5390
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5391
        if msg:
5392
          warning("Can't remove LV on old secondary: %s", msg,
5393
                  hint="Cleanup stale volumes by hand")
5394

    
5395
  def Exec(self, feedback_fn):
5396
    """Execute disk replacement.
5397

5398
    This dispatches the disk replacement to the appropriate handler.
5399

5400
    """
5401
    instance = self.instance
5402

    
5403
    # Activate the instance disks if we're replacing them on a down instance
5404
    if not instance.admin_up:
5405
      _StartInstanceDisks(self, instance, True)
5406

    
5407
    if self.op.mode == constants.REPLACE_DISK_CHG:
5408
      fn = self._ExecD8Secondary
5409
    else:
5410
      fn = self._ExecD8DiskOnly
5411

    
5412
    ret = fn(feedback_fn)
5413

    
5414
    # Deactivate the instance disks if we're replacing them on a down instance
5415
    if not instance.admin_up:
5416
      _SafeShutdownInstanceDisks(self, instance)
5417

    
5418
    return ret
5419

    
5420

    
5421
class LUGrowDisk(LogicalUnit):
5422
  """Grow a disk of an instance.
5423

5424
  """
5425
  HPATH = "disk-grow"
5426
  HTYPE = constants.HTYPE_INSTANCE
5427
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5428
  REQ_BGL = False
5429

    
5430
  def ExpandNames(self):
5431
    self._ExpandAndLockInstance()
5432
    self.needed_locks[locking.LEVEL_NODE] = []
5433
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5434

    
5435
  def DeclareLocks(self, level):
5436
    if level == locking.LEVEL_NODE:
5437
      self._LockInstancesNodes()
5438

    
5439
  def BuildHooksEnv(self):
5440
    """Build hooks env.
5441

5442
    This runs on the master, the primary and all the secondaries.
5443

5444
    """
5445
    env = {
5446
      "DISK": self.op.disk,
5447
      "AMOUNT": self.op.amount,
5448
      }
5449
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5450
    nl = [
5451
      self.cfg.GetMasterNode(),
5452
      self.instance.primary_node,
5453
      ]
5454
    return env, nl, nl
5455

    
5456
  def CheckPrereq(self):
5457
    """Check prerequisites.
5458

5459
    This checks that the instance is in the cluster.
5460

5461
    """
5462
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5463
    assert instance is not None, \
5464
      "Cannot retrieve locked instance %s" % self.op.instance_name
5465
    nodenames = list(instance.all_nodes)
5466
    for node in nodenames:
5467
      _CheckNodeOnline(self, node)
5468

    
5469

    
5470
    self.instance = instance
5471

    
5472
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5473
      raise errors.OpPrereqError("Instance's disk layout does not support"
5474
                                 " growing.")
5475

    
5476
    self.disk = instance.FindDisk(self.op.disk)
5477

    
5478
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5479
                                       instance.hypervisor)
5480
    for node in nodenames:
5481
      info = nodeinfo[node]
5482
      if info.failed or not info.data:
5483
        raise errors.OpPrereqError("Cannot get current information"
5484
                                   " from node '%s'" % node)
5485
      vg_free = info.data.get('vg_free', None)
5486
      if not isinstance(vg_free, int):
5487
        raise errors.OpPrereqError("Can't compute free disk space on"
5488
                                   " node %s" % node)
5489
      if self.op.amount > vg_free:
5490
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5491
                                   " %d MiB available, %d MiB required" %
5492
                                   (node, vg_free, self.op.amount))
5493

    
5494
  def Exec(self, feedback_fn):
5495
    """Execute disk grow.
5496

5497
    """
5498
    instance = self.instance
5499
    disk = self.disk
5500
    for node in instance.all_nodes:
5501
      self.cfg.SetDiskID(disk, node)
5502
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5503
      msg = result.RemoteFailMsg()
5504
      if msg:
5505
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5506
                                 (node, msg))
5507
    disk.RecordGrow(self.op.amount)
5508
    self.cfg.Update(instance)
5509
    if self.op.wait_for_sync:
5510
      disk_abort = not _WaitForSync(self, instance)
5511
      if disk_abort:
5512
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5513
                             " status.\nPlease check the instance.")
5514

    
5515

    
5516
class LUQueryInstanceData(NoHooksLU):
5517
  """Query runtime instance data.
5518

5519
  """
5520
  _OP_REQP = ["instances", "static"]
5521
  REQ_BGL = False
5522

    
5523
  def ExpandNames(self):
5524
    self.needed_locks = {}
5525
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5526

    
5527
    if not isinstance(self.op.instances, list):
5528
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5529

    
5530
    if self.op.instances:
5531
      self.wanted_names = []
5532
      for name in self.op.instances:
5533
        full_name = self.cfg.ExpandInstanceName(name)
5534
        if full_name is None:
5535
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5536
        self.wanted_names.append(full_name)
5537
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5538
    else:
5539
      self.wanted_names = None
5540
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5541

    
5542
    self.needed_locks[locking.LEVEL_NODE] = []
5543
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5544

    
5545
  def DeclareLocks(self, level):
5546
    if level == locking.LEVEL_NODE:
5547
      self._LockInstancesNodes()
5548

    
5549
  def CheckPrereq(self):
5550
    """Check prerequisites.
5551

5552
    This only checks the optional instance list against the existing names.
5553

5554
    """
5555
    if self.wanted_names is None:
5556
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5557

    
5558
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5559
                             in self.wanted_names]
5560
    return
5561

    
5562
  def _ComputeDiskStatus(self, instance, snode, dev):
5563
    """Compute block device status.
5564

5565
    """
5566
    static = self.op.static
5567
    if not static:
5568
      self.cfg.SetDiskID(dev, instance.primary_node)
5569
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5570
      if dev_pstatus.offline:
5571
        dev_pstatus = None
5572
      else:
5573
        msg = dev_pstatus.RemoteFailMsg()
5574
        if msg:
5575
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5576
                                   (instance.name, msg))
5577
        dev_pstatus = dev_pstatus.payload
5578
    else:
5579
      dev_pstatus = None
5580

    
5581
    if dev.dev_type in constants.LDS_DRBD:
5582
      # we change the snode then (otherwise we use the one passed in)
5583
      if dev.logical_id[0] == instance.primary_node:
5584
        snode = dev.logical_id[1]
5585
      else:
5586
        snode = dev.logical_id[0]
5587

    
5588
    if snode and not static:
5589
      self.cfg.SetDiskID(dev, snode)
5590
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5591
      if dev_sstatus.offline:
5592
        dev_sstatus = None
5593
      else:
5594
        msg = dev_sstatus.RemoteFailMsg()
5595
        if msg:
5596
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5597
                                   (instance.name, msg))
5598
        dev_sstatus = dev_sstatus.payload
5599
    else:
5600
      dev_sstatus = None
5601

    
5602
    if dev.children:
5603
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5604
                      for child in dev.children]
5605
    else:
5606
      dev_children = []
5607

    
5608
    data = {
5609
      "iv_name": dev.iv_name,
5610
      "dev_type": dev.dev_type,
5611
      "logical_id": dev.logical_id,
5612
      "physical_id": dev.physical_id,
5613
      "pstatus": dev_pstatus,
5614
      "sstatus": dev_sstatus,
5615
      "children": dev_children,
5616
      "mode": dev.mode,
5617
      }
5618

    
5619
    return data
5620

    
5621
  def Exec(self, feedback_fn):
5622
    """Gather and return data"""
5623
    result = {}
5624

    
5625
    cluster = self.cfg.GetClusterInfo()
5626

    
5627
    for instance in self.wanted_instances:
5628
      if not self.op.static:
5629
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5630
                                                  instance.name,
5631
                                                  instance.hypervisor)
5632
        remote_info.Raise()
5633
        remote_info = remote_info.data
5634
        if remote_info and "state" in remote_info:
5635
          remote_state = "up"
5636
        else:
5637
          remote_state = "down"
5638
      else:
5639
        remote_state = None
5640
      if instance.admin_up:
5641
        config_state = "up"
5642
      else:
5643
        config_state = "down"
5644

    
5645
      disks = [self._ComputeDiskStatus(instance, None, device)
5646
               for device in instance.disks]
5647

    
5648
      idict = {
5649
        "name": instance.name,
5650
        "config_state": config_state,
5651
        "run_state": remote_state,
5652
        "pnode": instance.primary_node,
5653
        "snodes": instance.secondary_nodes,
5654
        "os": instance.os,
5655
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5656
        "disks": disks,
5657
        "hypervisor": instance.hypervisor,
5658
        "network_port": instance.network_port,
5659
        "hv_instance": instance.hvparams,
5660
        "hv_actual": cluster.FillHV(instance),
5661
        "be_instance": instance.beparams,
5662
        "be_actual": cluster.FillBE(instance),
5663
        }
5664

    
5665
      result[instance.name] = idict
5666

    
5667
    return result
5668

    
5669

    
5670
class LUSetInstanceParams(LogicalUnit):
5671
  """Modifies an instances's parameters.
5672

5673
  """
5674
  HPATH = "instance-modify"
5675
  HTYPE = constants.HTYPE_INSTANCE
5676
  _OP_REQP = ["instance_name"]
5677
  REQ_BGL = False
5678

    
5679
  def CheckArguments(self):
5680
    if not hasattr(self.op, 'nics'):
5681
      self.op.nics = []
5682
    if not hasattr(self.op, 'disks'):
5683
      self.op.disks = []
5684
    if not hasattr(self.op, 'beparams'):
5685
      self.op.beparams = {}
5686
    if not hasattr(self.op, 'hvparams'):
5687
      self.op.hvparams = {}
5688
    self.op.force = getattr(self.op, "force", False)
5689
    if not (self.op.nics or self.op.disks or
5690
            self.op.hvparams or self.op.beparams):
5691
      raise errors.OpPrereqError("No changes submitted")
5692

    
5693
    # Disk validation
5694
    disk_addremove = 0
5695
    for disk_op, disk_dict in self.op.disks:
5696
      if disk_op == constants.DDM_REMOVE:
5697
        disk_addremove += 1
5698
        continue
5699
      elif disk_op == constants.DDM_ADD:
5700
        disk_addremove += 1
5701
      else:
5702
        if not isinstance(disk_op, int):
5703
          raise errors.OpPrereqError("Invalid disk index")
5704
      if disk_op == constants.DDM_ADD:
5705
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5706
        if mode not in constants.DISK_ACCESS_SET:
5707
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5708
        size = disk_dict.get('size', None)
5709
        if size is None:
5710
          raise errors.OpPrereqError("Required disk parameter size missing")
5711
        try:
5712
          size = int(size)
5713
        except ValueError, err:
5714
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5715
                                     str(err))
5716
        disk_dict['size'] = size
5717
      else:
5718
        # modification of disk
5719
        if 'size' in disk_dict:
5720
          raise errors.OpPrereqError("Disk size change not possible, use"
5721
                                     " grow-disk")
5722

    
5723
    if disk_addremove > 1:
5724
      raise errors.OpPrereqError("Only one disk add or remove operation"
5725
                                 " supported at a time")
5726

    
5727
    # NIC validation
5728
    nic_addremove = 0
5729
    for nic_op, nic_dict in self.op.nics:
5730
      if nic_op == constants.DDM_REMOVE:
5731
        nic_addremove += 1
5732
        continue
5733
      elif nic_op == constants.DDM_ADD:
5734
        nic_addremove += 1
5735
      else:
5736
        if not isinstance(nic_op, int):
5737
          raise errors.OpPrereqError("Invalid nic index")
5738

    
5739
      # nic_dict should be a dict
5740
      nic_ip = nic_dict.get('ip', None)
5741
      if nic_ip is not None:
5742
        if nic_ip.lower() == constants.VALUE_NONE:
5743
          nic_dict['ip'] = None
5744
        else:
5745
          if not utils.IsValidIP(nic_ip):
5746
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5747

    
5748
      if nic_op == constants.DDM_ADD:
5749
        nic_bridge = nic_dict.get('bridge', None)
5750
        if nic_bridge is None:
5751
          nic_dict['bridge'] = self.cfg.GetDefBridge()
5752
        nic_mac = nic_dict.get('mac', None)
5753
        if nic_mac is None:
5754
          nic_dict['mac'] = constants.VALUE_AUTO
5755

    
5756
      if 'mac' in nic_dict:
5757
        nic_mac = nic_dict['mac']
5758
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5759
          if not utils.IsValidMac(nic_mac):
5760
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5761
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5762
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5763
                                     " modifying an existing nic")
5764

    
5765
    if nic_addremove > 1:
5766
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5767
                                 " supported at a time")
5768

    
5769
  def ExpandNames(self):
5770
    self._ExpandAndLockInstance()
5771
    self.needed_locks[locking.LEVEL_NODE] = []
5772
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5773

    
5774
  def DeclareLocks(self, level):
5775
    if level == locking.LEVEL_NODE:
5776
      self._LockInstancesNodes()
5777

    
5778
  def BuildHooksEnv(self):
5779
    """Build hooks env.
5780

5781
    This runs on the master, primary and secondaries.
5782

5783
    """
5784
    args = dict()
5785
    if constants.BE_MEMORY in self.be_new:
5786
      args['memory'] = self.be_new[constants.BE_MEMORY]
5787
    if constants.BE_VCPUS in self.be_new:
5788
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5789
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5790
    # information at all.
5791
    if self.op.nics:
5792
      args['nics'] = []
5793
      nic_override = dict(self.op.nics)
5794
      for idx, nic in enumerate(self.instance.nics):
5795
        if idx in nic_override:
5796
          this_nic_override = nic_override[idx]
5797
        else:
5798
          this_nic_override = {}
5799
        if 'ip' in this_nic_override:
5800
          ip = this_nic_override['ip']
5801
        else:
5802
          ip = nic.ip
5803
        if 'bridge' in this_nic_override:
5804
          bridge = this_nic_override['bridge']
5805
        else:
5806
          bridge = nic.bridge
5807
        if 'mac' in this_nic_override:
5808
          mac = this_nic_override['mac']
5809
        else:
5810
          mac = nic.mac
5811
        args['nics'].append((ip, bridge, mac))
5812
      if constants.DDM_ADD in nic_override:
5813
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5814
        bridge = nic_override[constants.DDM_ADD]['bridge']
5815
        mac = nic_override[constants.DDM_ADD]['mac']
5816
        args['nics'].append((ip, bridge, mac))
5817
      elif constants.DDM_REMOVE in nic_override:
5818
        del args['nics'][-1]
5819

    
5820
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5821
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5822
    return env, nl, nl
5823

    
5824
  def CheckPrereq(self):
5825
    """Check prerequisites.
5826

5827
    This only checks the instance list against the existing names.
5828

5829
    """
5830
    force = self.force = self.op.force
5831

    
5832
    # checking the new params on the primary/secondary nodes
5833

    
5834
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5835
    assert self.instance is not None, \
5836
      "Cannot retrieve locked instance %s" % self.op.instance_name
5837
    pnode = instance.primary_node
5838
    nodelist = list(instance.all_nodes)
5839

    
5840
    # hvparams processing
5841
    if self.op.hvparams:
5842
      i_hvdict = copy.deepcopy(instance.hvparams)
5843
      for key, val in self.op.hvparams.iteritems():
5844
        if val == constants.VALUE_DEFAULT:
5845
          try:
5846
            del i_hvdict[key]
5847
          except KeyError:
5848
            pass
5849
        else:
5850
          i_hvdict[key] = val
5851
      cluster = self.cfg.GetClusterInfo()
5852
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5853
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5854
                                i_hvdict)
5855
      # local check
5856
      hypervisor.GetHypervisor(
5857
        instance.hypervisor).CheckParameterSyntax(hv_new)
5858
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5859
      self.hv_new = hv_new # the new actual values
5860
      self.hv_inst = i_hvdict # the new dict (without defaults)
5861
    else:
5862
      self.hv_new = self.hv_inst = {}
5863

    
5864
    # beparams processing
5865
    if self.op.beparams:
5866
      i_bedict = copy.deepcopy(instance.beparams)
5867
      for key, val in self.op.beparams.iteritems():
5868
        if val == constants.VALUE_DEFAULT:
5869
          try:
5870
            del i_bedict[key]
5871
          except KeyError:
5872
            pass
5873
        else:
5874
          i_bedict[key] = val
5875
      cluster = self.cfg.GetClusterInfo()
5876
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5877
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5878
                                i_bedict)
5879
      self.be_new = be_new # the new actual values
5880
      self.be_inst = i_bedict # the new dict (without defaults)
5881
    else:
5882
      self.be_new = self.be_inst = {}
5883

    
5884
    self.warn = []
5885

    
5886
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5887
      mem_check_list = [pnode]
5888
      if be_new[constants.BE_AUTO_BALANCE]:
5889
        # either we changed auto_balance to yes or it was from before
5890
        mem_check_list.extend(instance.secondary_nodes)
5891
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5892
                                                  instance.hypervisor)
5893
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5894
                                         instance.hypervisor)
5895
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5896
        # Assume the primary node is unreachable and go ahead
5897
        self.warn.append("Can't get info from primary node %s" % pnode)
5898
      else:
5899
        if not instance_info.failed and instance_info.data:
5900
          current_mem = instance_info.data['memory']
5901
        else:
5902
          # Assume instance not running
5903
          # (there is a slight race condition here, but it's not very probable,
5904
          # and we have no other way to check)
5905
          current_mem = 0
5906
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5907
                    nodeinfo[pnode].data['memory_free'])
5908
        if miss_mem > 0:
5909
          raise errors.OpPrereqError("This change will prevent the instance"
5910
                                     " from starting, due to %d MB of memory"
5911
                                     " missing on its primary node" % miss_mem)
5912

    
5913
      if be_new[constants.BE_AUTO_BALANCE]:
5914
        for node, nres in nodeinfo.iteritems():
5915
          if node not in instance.secondary_nodes:
5916
            continue
5917
          if nres.failed or not isinstance(nres.data, dict):
5918
            self.warn.append("Can't get info from secondary node %s" % node)
5919
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5920
            self.warn.append("Not enough memory to failover instance to"
5921
                             " secondary node %s" % node)
5922

    
5923
    # NIC processing
5924
    for nic_op, nic_dict in self.op.nics:
5925
      if nic_op == constants.DDM_REMOVE:
5926
        if not instance.nics:
5927
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5928
        continue
5929
      if nic_op != constants.DDM_ADD:
5930
        # an existing nic
5931
        if nic_op < 0 or nic_op >= len(instance.nics):
5932
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5933
                                     " are 0 to %d" %
5934
                                     (nic_op, len(instance.nics)))
5935
      if 'bridge' in nic_dict:
5936
        nic_bridge = nic_dict['bridge']
5937
        if nic_bridge is None:
5938
          raise errors.OpPrereqError('Cannot set the nic bridge to None')
5939
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5940
          msg = ("Bridge '%s' doesn't exist on one of"
5941
                 " the instance nodes" % nic_bridge)
5942
          if self.force:
5943
            self.warn.append(msg)
5944
          else:
5945
            raise errors.OpPrereqError(msg)
5946
      if 'mac' in nic_dict:
5947
        nic_mac = nic_dict['mac']
5948
        if nic_mac is None:
5949
          raise errors.OpPrereqError('Cannot set the nic mac to None')
5950
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5951
          # otherwise generate the mac
5952
          nic_dict['mac'] = self.cfg.GenerateMAC()
5953
        else:
5954
          # or validate/reserve the current one
5955
          if self.cfg.IsMacInUse(nic_mac):
5956
            raise errors.OpPrereqError("MAC address %s already in use"
5957
                                       " in cluster" % nic_mac)
5958

    
5959
    # DISK processing
5960
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
5961
      raise errors.OpPrereqError("Disk operations not supported for"
5962
                                 " diskless instances")
5963
    for disk_op, disk_dict in self.op.disks:
5964
      if disk_op == constants.DDM_REMOVE:
5965
        if len(instance.disks) == 1:
5966
          raise errors.OpPrereqError("Cannot remove the last disk of"
5967
                                     " an instance")
5968
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
5969
        ins_l = ins_l[pnode]
5970
        if ins_l.failed or not isinstance(ins_l.data, list):
5971
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
5972
        if instance.name in ins_l.data:
5973
          raise errors.OpPrereqError("Instance is running, can't remove"
5974
                                     " disks.")
5975

    
5976
      if (disk_op == constants.DDM_ADD and
5977
          len(instance.nics) >= constants.MAX_DISKS):
5978
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
5979
                                   " add more" % constants.MAX_DISKS)
5980
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
5981
        # an existing disk
5982
        if disk_op < 0 or disk_op >= len(instance.disks):
5983
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
5984
                                     " are 0 to %d" %
5985
                                     (disk_op, len(instance.disks)))
5986

    
5987
    return
5988

    
5989
  def Exec(self, feedback_fn):
5990
    """Modifies an instance.
5991

5992
    All parameters take effect only at the next restart of the instance.
5993

5994
    """
5995
    # Process here the warnings from CheckPrereq, as we don't have a
5996
    # feedback_fn there.
5997
    for warn in self.warn:
5998
      feedback_fn("WARNING: %s" % warn)
5999

    
6000
    result = []
6001
    instance = self.instance
6002
    # disk changes
6003
    for disk_op, disk_dict in self.op.disks:
6004
      if disk_op == constants.DDM_REMOVE:
6005
        # remove the last disk
6006
        device = instance.disks.pop()
6007
        device_idx = len(instance.disks)
6008
        for node, disk in device.ComputeNodeTree(instance.primary_node):
6009
          self.cfg.SetDiskID(disk, node)
6010
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6011
          if msg:
6012
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
6013
                            " continuing anyway", device_idx, node, msg)
6014
        result.append(("disk/%d" % device_idx, "remove"))
6015
      elif disk_op == constants.DDM_ADD:
6016
        # add a new disk
6017
        if instance.disk_template == constants.DT_FILE:
6018
          file_driver, file_path = instance.disks[0].logical_id
6019
          file_path = os.path.dirname(file_path)
6020
        else:
6021
          file_driver = file_path = None
6022
        disk_idx_base = len(instance.disks)
6023
        new_disk = _GenerateDiskTemplate(self,
6024
                                         instance.disk_template,
6025
                                         instance.name, instance.primary_node,
6026
                                         instance.secondary_nodes,
6027
                                         [disk_dict],
6028
                                         file_path,
6029
                                         file_driver,
6030
                                         disk_idx_base)[0]
6031
        instance.disks.append(new_disk)
6032
        info = _GetInstanceInfoText(instance)
6033

    
6034
        logging.info("Creating volume %s for instance %s",
6035
                     new_disk.iv_name, instance.name)
6036
        # Note: this needs to be kept in sync with _CreateDisks
6037
        #HARDCODE
6038
        for node in instance.all_nodes:
6039
          f_create = node == instance.primary_node
6040
          try:
6041
            _CreateBlockDev(self, node, instance, new_disk,
6042
                            f_create, info, f_create)
6043
          except errors.OpExecError, err:
6044
            self.LogWarning("Failed to create volume %s (%s) on"
6045
                            " node %s: %s",
6046
                            new_disk.iv_name, new_disk, node, err)
6047
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6048
                       (new_disk.size, new_disk.mode)))
6049
      else:
6050
        # change a given disk
6051
        instance.disks[disk_op].mode = disk_dict['mode']
6052
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6053
    # NIC changes
6054
    for nic_op, nic_dict in self.op.nics:
6055
      if nic_op == constants.DDM_REMOVE:
6056
        # remove the last nic
6057
        del instance.nics[-1]
6058
        result.append(("nic.%d" % len(instance.nics), "remove"))
6059
      elif nic_op == constants.DDM_ADD:
6060
        # mac and bridge should be set, by now
6061
        mac = nic_dict['mac']
6062
        bridge = nic_dict['bridge']
6063
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6064
                              bridge=bridge)
6065
        instance.nics.append(new_nic)
6066
        result.append(("nic.%d" % (len(instance.nics) - 1),
6067
                       "add:mac=%s,ip=%s,bridge=%s" %
6068
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
6069
      else:
6070
        # change a given nic
6071
        for key in 'mac', 'ip', 'bridge':
6072
          if key in nic_dict:
6073
            setattr(instance.nics[nic_op], key, nic_dict[key])
6074
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6075

    
6076
    # hvparams changes
6077
    if self.op.hvparams:
6078
      instance.hvparams = self.hv_inst
6079
      for key, val in self.op.hvparams.iteritems():
6080
        result.append(("hv/%s" % key, val))
6081

    
6082
    # beparams changes
6083
    if self.op.beparams:
6084
      instance.beparams = self.be_inst
6085
      for key, val in self.op.beparams.iteritems():
6086
        result.append(("be/%s" % key, val))
6087

    
6088
    self.cfg.Update(instance)
6089

    
6090
    return result
6091

    
6092

    
6093
class LUQueryExports(NoHooksLU):
6094
  """Query the exports list
6095

6096
  """
6097
  _OP_REQP = ['nodes']
6098
  REQ_BGL = False
6099

    
6100
  def ExpandNames(self):
6101
    self.needed_locks = {}
6102
    self.share_locks[locking.LEVEL_NODE] = 1
6103
    if not self.op.nodes:
6104
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6105
    else:
6106
      self.needed_locks[locking.LEVEL_NODE] = \
6107
        _GetWantedNodes(self, self.op.nodes)
6108

    
6109
  def CheckPrereq(self):
6110
    """Check prerequisites.
6111

6112
    """
6113
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6114

    
6115
  def Exec(self, feedback_fn):
6116
    """Compute the list of all the exported system images.
6117

6118
    @rtype: dict
6119
    @return: a dictionary with the structure node->(export-list)
6120
        where export-list is a list of the instances exported on
6121
        that node.
6122

6123
    """
6124
    rpcresult = self.rpc.call_export_list(self.nodes)
6125
    result = {}
6126
    for node in rpcresult:
6127
      if rpcresult[node].failed:
6128
        result[node] = False
6129
      else:
6130
        result[node] = rpcresult[node].data
6131

    
6132
    return result
6133

    
6134

    
6135
class LUExportInstance(LogicalUnit):
6136
  """Export an instance to an image in the cluster.
6137

6138
  """
6139
  HPATH = "instance-export"
6140
  HTYPE = constants.HTYPE_INSTANCE
6141
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6142
  REQ_BGL = False
6143

    
6144
  def ExpandNames(self):
6145
    self._ExpandAndLockInstance()
6146
    # FIXME: lock only instance primary and destination node
6147
    #
6148
    # Sad but true, for now we have do lock all nodes, as we don't know where
6149
    # the previous export might be, and and in this LU we search for it and
6150
    # remove it from its current node. In the future we could fix this by:
6151
    #  - making a tasklet to search (share-lock all), then create the new one,
6152
    #    then one to remove, after
6153
    #  - removing the removal operation altoghether
6154
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6155

    
6156
  def DeclareLocks(self, level):
6157
    """Last minute lock declaration."""
6158
    # All nodes are locked anyway, so nothing to do here.
6159

    
6160
  def BuildHooksEnv(self):
6161
    """Build hooks env.
6162

6163
    This will run on the master, primary node and target node.
6164

6165
    """
6166
    env = {
6167
      "EXPORT_NODE": self.op.target_node,
6168
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6169
      }
6170
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6171
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6172
          self.op.target_node]
6173
    return env, nl, nl
6174

    
6175
  def CheckPrereq(self):
6176
    """Check prerequisites.
6177

6178
    This checks that the instance and node names are valid.
6179

6180
    """
6181
    instance_name = self.op.instance_name
6182
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6183
    assert self.instance is not None, \
6184
          "Cannot retrieve locked instance %s" % self.op.instance_name
6185
    _CheckNodeOnline(self, self.instance.primary_node)
6186

    
6187
    self.dst_node = self.cfg.GetNodeInfo(
6188
      self.cfg.ExpandNodeName(self.op.target_node))
6189

    
6190
    if self.dst_node is None:
6191
      # This is wrong node name, not a non-locked node
6192
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6193
    _CheckNodeOnline(self, self.dst_node.name)
6194
    _CheckNodeNotDrained(self, self.dst_node.name)
6195

    
6196
    # instance disk type verification
6197
    for disk in self.instance.disks:
6198
      if disk.dev_type == constants.LD_FILE:
6199
        raise errors.OpPrereqError("Export not supported for instances with"
6200
                                   " file-based disks")
6201

    
6202
  def Exec(self, feedback_fn):
6203
    """Export an instance to an image in the cluster.
6204

6205
    """
6206
    instance = self.instance
6207
    dst_node = self.dst_node
6208
    src_node = instance.primary_node
6209
    if self.op.shutdown:
6210
      # shutdown the instance, but not the disks
6211
      result = self.rpc.call_instance_shutdown(src_node, instance)
6212
      msg = result.RemoteFailMsg()
6213
      if msg:
6214
        raise errors.OpExecError("Could not shutdown instance %s on"
6215
                                 " node %s: %s" %
6216
                                 (instance.name, src_node, msg))
6217

    
6218
    vgname = self.cfg.GetVGName()
6219

    
6220
    snap_disks = []
6221

    
6222
    # set the disks ID correctly since call_instance_start needs the
6223
    # correct drbd minor to create the symlinks
6224
    for disk in instance.disks:
6225
      self.cfg.SetDiskID(disk, src_node)
6226

    
6227
    try:
6228
      for disk in instance.disks:
6229
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6230
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6231
        if new_dev_name.failed or not new_dev_name.data:
6232
          self.LogWarning("Could not snapshot block device %s on node %s",
6233
                          disk.logical_id[1], src_node)
6234
          snap_disks.append(False)
6235
        else:
6236
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6237
                                 logical_id=(vgname, new_dev_name.data),
6238
                                 physical_id=(vgname, new_dev_name.data),
6239
                                 iv_name=disk.iv_name)
6240
          snap_disks.append(new_dev)
6241

    
6242
    finally:
6243
      if self.op.shutdown and instance.admin_up:
6244
        result = self.rpc.call_instance_start(src_node, instance)
6245
        msg = result.RemoteFailMsg()
6246
        if msg:
6247
          _ShutdownInstanceDisks(self, instance)
6248
          raise errors.OpExecError("Could not start instance: %s" % msg)
6249

    
6250
    # TODO: check for size
6251

    
6252
    cluster_name = self.cfg.GetClusterName()
6253
    for idx, dev in enumerate(snap_disks):
6254
      if dev:
6255
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6256
                                               instance, cluster_name, idx)
6257
        if result.failed or not result.data:
6258
          self.LogWarning("Could not export block device %s from node %s to"
6259
                          " node %s", dev.logical_id[1], src_node,
6260
                          dst_node.name)
6261
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6262
        if msg:
6263
          self.LogWarning("Could not remove snapshot block device %s from node"
6264
                          " %s: %s", dev.logical_id[1], src_node, msg)
6265

    
6266
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6267
    if result.failed or not result.data:
6268
      self.LogWarning("Could not finalize export for instance %s on node %s",
6269
                      instance.name, dst_node.name)
6270

    
6271
    nodelist = self.cfg.GetNodeList()
6272
    nodelist.remove(dst_node.name)
6273

    
6274
    # on one-node clusters nodelist will be empty after the removal
6275
    # if we proceed the backup would be removed because OpQueryExports
6276
    # substitutes an empty list with the full cluster node list.
6277
    if nodelist:
6278
      exportlist = self.rpc.call_export_list(nodelist)
6279
      for node in exportlist:
6280
        if exportlist[node].failed:
6281
          continue
6282
        if instance.name in exportlist[node].data:
6283
          if not self.rpc.call_export_remove(node, instance.name):
6284
            self.LogWarning("Could not remove older export for instance %s"
6285
                            " on node %s", instance.name, node)
6286

    
6287

    
6288
class LURemoveExport(NoHooksLU):
6289
  """Remove exports related to the named instance.
6290

6291
  """
6292
  _OP_REQP = ["instance_name"]
6293
  REQ_BGL = False
6294

    
6295
  def ExpandNames(self):
6296
    self.needed_locks = {}
6297
    # We need all nodes to be locked in order for RemoveExport to work, but we
6298
    # don't need to lock the instance itself, as nothing will happen to it (and
6299
    # we can remove exports also for a removed instance)
6300
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6301

    
6302
  def CheckPrereq(self):
6303
    """Check prerequisites.
6304
    """
6305
    pass
6306

    
6307
  def Exec(self, feedback_fn):
6308
    """Remove any export.
6309

6310
    """
6311
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6312
    # If the instance was not found we'll try with the name that was passed in.
6313
    # This will only work if it was an FQDN, though.
6314
    fqdn_warn = False
6315
    if not instance_name:
6316
      fqdn_warn = True
6317
      instance_name = self.op.instance_name
6318

    
6319
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6320
      locking.LEVEL_NODE])
6321
    found = False
6322
    for node in exportlist:
6323
      if exportlist[node].failed:
6324
        self.LogWarning("Failed to query node %s, continuing" % node)
6325
        continue
6326
      if instance_name in exportlist[node].data:
6327
        found = True
6328
        result = self.rpc.call_export_remove(node, instance_name)
6329
        if result.failed or not result.data:
6330
          logging.error("Could not remove export for instance %s"
6331
                        " on node %s", instance_name, node)
6332

    
6333
    if fqdn_warn and not found:
6334
      feedback_fn("Export not found. If trying to remove an export belonging"
6335
                  " to a deleted instance please use its Fully Qualified"
6336
                  " Domain Name.")
6337

    
6338

    
6339
class TagsLU(NoHooksLU):
6340
  """Generic tags LU.
6341

6342
  This is an abstract class which is the parent of all the other tags LUs.
6343

6344
  """
6345

    
6346
  def ExpandNames(self):
6347
    self.needed_locks = {}
6348
    if self.op.kind == constants.TAG_NODE:
6349
      name = self.cfg.ExpandNodeName(self.op.name)
6350
      if name is None:
6351
        raise errors.OpPrereqError("Invalid node name (%s)" %
6352
                                   (self.op.name,))
6353
      self.op.name = name
6354
      self.needed_locks[locking.LEVEL_NODE] = name
6355
    elif self.op.kind == constants.TAG_INSTANCE:
6356
      name = self.cfg.ExpandInstanceName(self.op.name)
6357
      if name is None:
6358
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6359
                                   (self.op.name,))
6360
      self.op.name = name
6361
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6362

    
6363
  def CheckPrereq(self):
6364
    """Check prerequisites.
6365

6366
    """
6367
    if self.op.kind == constants.TAG_CLUSTER:
6368
      self.target = self.cfg.GetClusterInfo()
6369
    elif self.op.kind == constants.TAG_NODE:
6370
      self.target = self.cfg.GetNodeInfo(self.op.name)
6371
    elif self.op.kind == constants.TAG_INSTANCE:
6372
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6373
    else:
6374
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6375
                                 str(self.op.kind))
6376

    
6377

    
6378
class LUGetTags(TagsLU):
6379
  """Returns the tags of a given object.
6380

6381
  """
6382
  _OP_REQP = ["kind", "name"]
6383
  REQ_BGL = False
6384

    
6385
  def Exec(self, feedback_fn):
6386
    """Returns the tag list.
6387

6388
    """
6389
    return list(self.target.GetTags())
6390

    
6391

    
6392
class LUSearchTags(NoHooksLU):
6393
  """Searches the tags for a given pattern.
6394

6395
  """
6396
  _OP_REQP = ["pattern"]
6397
  REQ_BGL = False
6398

    
6399
  def ExpandNames(self):
6400
    self.needed_locks = {}
6401

    
6402
  def CheckPrereq(self):
6403
    """Check prerequisites.
6404

6405
    This checks the pattern passed for validity by compiling it.
6406

6407
    """
6408
    try:
6409
      self.re = re.compile(self.op.pattern)
6410
    except re.error, err:
6411
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6412
                                 (self.op.pattern, err))
6413

    
6414
  def Exec(self, feedback_fn):
6415
    """Returns the tag list.
6416

6417
    """
6418
    cfg = self.cfg
6419
    tgts = [("/cluster", cfg.GetClusterInfo())]
6420
    ilist = cfg.GetAllInstancesInfo().values()
6421
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6422
    nlist = cfg.GetAllNodesInfo().values()
6423
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6424
    results = []
6425
    for path, target in tgts:
6426
      for tag in target.GetTags():
6427
        if self.re.search(tag):
6428
          results.append((path, tag))
6429
    return results
6430

    
6431

    
6432
class LUAddTags(TagsLU):
6433
  """Sets a tag on a given object.
6434

6435
  """
6436
  _OP_REQP = ["kind", "name", "tags"]
6437
  REQ_BGL = False
6438

    
6439
  def CheckPrereq(self):
6440
    """Check prerequisites.
6441

6442
    This checks the type and length of the tag name and value.
6443

6444
    """
6445
    TagsLU.CheckPrereq(self)
6446
    for tag in self.op.tags:
6447
      objects.TaggableObject.ValidateTag(tag)
6448

    
6449
  def Exec(self, feedback_fn):
6450
    """Sets the tag.
6451

6452
    """
6453
    try:
6454
      for tag in self.op.tags:
6455
        self.target.AddTag(tag)
6456
    except errors.TagError, err:
6457
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6458
    try:
6459
      self.cfg.Update(self.target)
6460
    except errors.ConfigurationError:
6461
      raise errors.OpRetryError("There has been a modification to the"
6462
                                " config file and the operation has been"
6463
                                " aborted. Please retry.")
6464

    
6465

    
6466
class LUDelTags(TagsLU):
6467
  """Delete a list of tags from a given object.
6468

6469
  """
6470
  _OP_REQP = ["kind", "name", "tags"]
6471
  REQ_BGL = False
6472

    
6473
  def CheckPrereq(self):
6474
    """Check prerequisites.
6475

6476
    This checks that we have the given tag.
6477

6478
    """
6479
    TagsLU.CheckPrereq(self)
6480
    for tag in self.op.tags:
6481
      objects.TaggableObject.ValidateTag(tag)
6482
    del_tags = frozenset(self.op.tags)
6483
    cur_tags = self.target.GetTags()
6484
    if not del_tags <= cur_tags:
6485
      diff_tags = del_tags - cur_tags
6486
      diff_names = ["'%s'" % tag for tag in diff_tags]
6487
      diff_names.sort()
6488
      raise errors.OpPrereqError("Tag(s) %s not found" %
6489
                                 (",".join(diff_names)))
6490

    
6491
  def Exec(self, feedback_fn):
6492
    """Remove the tag from the object.
6493

6494
    """
6495
    for tag in self.op.tags:
6496
      self.target.RemoveTag(tag)
6497
    try:
6498
      self.cfg.Update(self.target)
6499
    except errors.ConfigurationError:
6500
      raise errors.OpRetryError("There has been a modification to the"
6501
                                " config file and the operation has been"
6502
                                " aborted. Please retry.")
6503

    
6504

    
6505
class LUTestDelay(NoHooksLU):
6506
  """Sleep for a specified amount of time.
6507

6508
  This LU sleeps on the master and/or nodes for a specified amount of
6509
  time.
6510

6511
  """
6512
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6513
  REQ_BGL = False
6514

    
6515
  def ExpandNames(self):
6516
    """Expand names and set required locks.
6517

6518
    This expands the node list, if any.
6519

6520
    """
6521
    self.needed_locks = {}
6522
    if self.op.on_nodes:
6523
      # _GetWantedNodes can be used here, but is not always appropriate to use
6524
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6525
      # more information.
6526
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6527
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6528

    
6529
  def CheckPrereq(self):
6530
    """Check prerequisites.
6531

6532
    """
6533

    
6534
  def Exec(self, feedback_fn):
6535
    """Do the actual sleep.
6536

6537
    """
6538
    if self.op.on_master:
6539
      if not utils.TestDelay(self.op.duration):
6540
        raise errors.OpExecError("Error during master delay test")
6541
    if self.op.on_nodes:
6542
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6543
      if not result:
6544
        raise errors.OpExecError("Complete failure from rpc call")
6545
      for node, node_result in result.items():
6546
        node_result.Raise()
6547
        if not node_result.data:
6548
          raise errors.OpExecError("Failure during rpc call to node %s,"
6549
                                   " result: %s" % (node, node_result.data))
6550

    
6551

    
6552
class IAllocator(object):
6553
  """IAllocator framework.
6554

6555
  An IAllocator instance has three sets of attributes:
6556
    - cfg that is needed to query the cluster
6557
    - input data (all members of the _KEYS class attribute are required)
6558
    - four buffer attributes (in|out_data|text), that represent the
6559
      input (to the external script) in text and data structure format,
6560
      and the output from it, again in two formats
6561
    - the result variables from the script (success, info, nodes) for
6562
      easy usage
6563

6564
  """
6565
  _ALLO_KEYS = [
6566
    "mem_size", "disks", "disk_template",
6567
    "os", "tags", "nics", "vcpus", "hypervisor",
6568
    ]
6569
  _RELO_KEYS = [
6570
    "relocate_from",
6571
    ]
6572

    
6573
  def __init__(self, lu, mode, name, **kwargs):
6574
    self.lu = lu
6575
    # init buffer variables
6576
    self.in_text = self.out_text = self.in_data = self.out_data = None
6577
    # init all input fields so that pylint is happy
6578
    self.mode = mode
6579
    self.name = name
6580
    self.mem_size = self.disks = self.disk_template = None
6581
    self.os = self.tags = self.nics = self.vcpus = None
6582
    self.hypervisor = None
6583
    self.relocate_from = None
6584
    # computed fields
6585
    self.required_nodes = None
6586
    # init result fields
6587
    self.success = self.info = self.nodes = None
6588
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6589
      keyset = self._ALLO_KEYS
6590
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6591
      keyset = self._RELO_KEYS
6592
    else:
6593
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6594
                                   " IAllocator" % self.mode)
6595
    for key in kwargs:
6596
      if key not in keyset:
6597
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6598
                                     " IAllocator" % key)
6599
      setattr(self, key, kwargs[key])
6600
    for key in keyset:
6601
      if key not in kwargs:
6602
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6603
                                     " IAllocator" % key)
6604
    self._BuildInputData()
6605

    
6606
  def _ComputeClusterData(self):
6607
    """Compute the generic allocator input data.
6608

6609
    This is the data that is independent of the actual operation.
6610

6611
    """
6612
    cfg = self.lu.cfg
6613
    cluster_info = cfg.GetClusterInfo()
6614
    # cluster data
6615
    data = {
6616
      "version": constants.IALLOCATOR_VERSION,
6617
      "cluster_name": cfg.GetClusterName(),
6618
      "cluster_tags": list(cluster_info.GetTags()),
6619
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6620
      # we don't have job IDs
6621
      }
6622
    iinfo = cfg.GetAllInstancesInfo().values()
6623
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6624

    
6625
    # node data
6626
    node_results = {}
6627
    node_list = cfg.GetNodeList()
6628

    
6629
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6630
      hypervisor_name = self.hypervisor
6631
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6632
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6633

    
6634
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6635
                                           hypervisor_name)
6636
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6637
                       cluster_info.enabled_hypervisors)
6638
    for nname, nresult in node_data.items():
6639
      # first fill in static (config-based) values
6640
      ninfo = cfg.GetNodeInfo(nname)
6641
      pnr = {
6642
        "tags": list(ninfo.GetTags()),
6643
        "primary_ip": ninfo.primary_ip,
6644
        "secondary_ip": ninfo.secondary_ip,
6645
        "offline": ninfo.offline,
6646
        "drained": ninfo.drained,
6647
        "master_candidate": ninfo.master_candidate,
6648
        }
6649

    
6650
      if not ninfo.offline:
6651
        nresult.Raise()
6652
        if not isinstance(nresult.data, dict):
6653
          raise errors.OpExecError("Can't get data for node %s" % nname)
6654
        remote_info = nresult.data
6655
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6656
                     'vg_size', 'vg_free', 'cpu_total']:
6657
          if attr not in remote_info:
6658
            raise errors.OpExecError("Node '%s' didn't return attribute"
6659
                                     " '%s'" % (nname, attr))
6660
          try:
6661
            remote_info[attr] = int(remote_info[attr])
6662
          except ValueError, err:
6663
            raise errors.OpExecError("Node '%s' returned invalid value"
6664
                                     " for '%s': %s" % (nname, attr, err))
6665
        # compute memory used by primary instances
6666
        i_p_mem = i_p_up_mem = 0
6667
        for iinfo, beinfo in i_list:
6668
          if iinfo.primary_node == nname:
6669
            i_p_mem += beinfo[constants.BE_MEMORY]
6670
            if iinfo.name not in node_iinfo[nname].data:
6671
              i_used_mem = 0
6672
            else:
6673
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6674
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6675
            remote_info['memory_free'] -= max(0, i_mem_diff)
6676

    
6677
            if iinfo.admin_up:
6678
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6679

    
6680
        # compute memory used by instances
6681
        pnr_dyn = {
6682
          "total_memory": remote_info['memory_total'],
6683
          "reserved_memory": remote_info['memory_dom0'],
6684
          "free_memory": remote_info['memory_free'],
6685
          "total_disk": remote_info['vg_size'],
6686
          "free_disk": remote_info['vg_free'],
6687
          "total_cpus": remote_info['cpu_total'],
6688
          "i_pri_memory": i_p_mem,
6689
          "i_pri_up_memory": i_p_up_mem,
6690
          }
6691
        pnr.update(pnr_dyn)
6692

    
6693
      node_results[nname] = pnr
6694
    data["nodes"] = node_results
6695

    
6696
    # instance data
6697
    instance_data = {}
6698
    for iinfo, beinfo in i_list:
6699
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6700
                  for n in iinfo.nics]
6701
      pir = {
6702
        "tags": list(iinfo.GetTags()),
6703
        "admin_up": iinfo.admin_up,
6704
        "vcpus": beinfo[constants.BE_VCPUS],
6705
        "memory": beinfo[constants.BE_MEMORY],
6706
        "os": iinfo.os,
6707
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6708
        "nics": nic_data,
6709
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6710
        "disk_template": iinfo.disk_template,
6711
        "hypervisor": iinfo.hypervisor,
6712
        }
6713
      instance_data[iinfo.name] = pir
6714

    
6715
    data["instances"] = instance_data
6716

    
6717
    self.in_data = data
6718

    
6719
  def _AddNewInstance(self):
6720
    """Add new instance data to allocator structure.
6721

6722
    This in combination with _AllocatorGetClusterData will create the
6723
    correct structure needed as input for the allocator.
6724

6725
    The checks for the completeness of the opcode must have already been
6726
    done.
6727

6728
    """
6729
    data = self.in_data
6730

    
6731
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6732

    
6733
    if self.disk_template in constants.DTS_NET_MIRROR:
6734
      self.required_nodes = 2
6735
    else:
6736
      self.required_nodes = 1
6737
    request = {
6738
      "type": "allocate",
6739
      "name": self.name,
6740
      "disk_template": self.disk_template,
6741
      "tags": self.tags,
6742
      "os": self.os,
6743
      "vcpus": self.vcpus,
6744
      "memory": self.mem_size,
6745
      "disks": self.disks,
6746
      "disk_space_total": disk_space,
6747
      "nics": self.nics,
6748
      "required_nodes": self.required_nodes,
6749
      }
6750
    data["request"] = request
6751

    
6752
  def _AddRelocateInstance(self):
6753
    """Add relocate instance data to allocator structure.
6754

6755
    This in combination with _IAllocatorGetClusterData will create the
6756
    correct structure needed as input for the allocator.
6757

6758
    The checks for the completeness of the opcode must have already been
6759
    done.
6760

6761
    """
6762
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6763
    if instance is None:
6764
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6765
                                   " IAllocator" % self.name)
6766

    
6767
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6768
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6769

    
6770
    if len(instance.secondary_nodes) != 1:
6771
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6772

    
6773
    self.required_nodes = 1
6774
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6775
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6776

    
6777
    request = {
6778
      "type": "relocate",
6779
      "name": self.name,
6780
      "disk_space_total": disk_space,
6781
      "required_nodes": self.required_nodes,
6782
      "relocate_from": self.relocate_from,
6783
      }
6784
    self.in_data["request"] = request
6785

    
6786
  def _BuildInputData(self):
6787
    """Build input data structures.
6788

6789
    """
6790
    self._ComputeClusterData()
6791

    
6792
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6793
      self._AddNewInstance()
6794
    else:
6795
      self._AddRelocateInstance()
6796

    
6797
    self.in_text = serializer.Dump(self.in_data)
6798

    
6799
  def Run(self, name, validate=True, call_fn=None):
6800
    """Run an instance allocator and return the results.
6801

6802
    """
6803
    if call_fn is None:
6804
      call_fn = self.lu.rpc.call_iallocator_runner
6805
    data = self.in_text
6806

    
6807
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6808
    result.Raise()
6809

    
6810
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6811
      raise errors.OpExecError("Invalid result from master iallocator runner")
6812

    
6813
    rcode, stdout, stderr, fail = result.data
6814

    
6815
    if rcode == constants.IARUN_NOTFOUND:
6816
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6817
    elif rcode == constants.IARUN_FAILURE:
6818
      raise errors.OpExecError("Instance allocator call failed: %s,"
6819
                               " output: %s" % (fail, stdout+stderr))
6820
    self.out_text = stdout
6821
    if validate:
6822
      self._ValidateResult()
6823

    
6824
  def _ValidateResult(self):
6825
    """Process the allocator results.
6826

6827
    This will process and if successful save the result in
6828
    self.out_data and the other parameters.
6829

6830
    """
6831
    try:
6832
      rdict = serializer.Load(self.out_text)
6833
    except Exception, err:
6834
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6835

    
6836
    if not isinstance(rdict, dict):
6837
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6838

    
6839
    for key in "success", "info", "nodes":
6840
      if key not in rdict:
6841
        raise errors.OpExecError("Can't parse iallocator results:"
6842
                                 " missing key '%s'" % key)
6843
      setattr(self, key, rdict[key])
6844

    
6845
    if not isinstance(rdict["nodes"], list):
6846
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6847
                               " is not a list")
6848
    self.out_data = rdict
6849

    
6850

    
6851
class LUTestAllocator(NoHooksLU):
6852
  """Run allocator tests.
6853

6854
  This LU runs the allocator tests
6855

6856
  """
6857
  _OP_REQP = ["direction", "mode", "name"]
6858

    
6859
  def CheckPrereq(self):
6860
    """Check prerequisites.
6861

6862
    This checks the opcode parameters depending on the director and mode test.
6863

6864
    """
6865
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6866
      for attr in ["name", "mem_size", "disks", "disk_template",
6867
                   "os", "tags", "nics", "vcpus"]:
6868
        if not hasattr(self.op, attr):
6869
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6870
                                     attr)
6871
      iname = self.cfg.ExpandInstanceName(self.op.name)
6872
      if iname is not None:
6873
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6874
                                   iname)
6875
      if not isinstance(self.op.nics, list):
6876
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6877
      for row in self.op.nics:
6878
        if (not isinstance(row, dict) or
6879
            "mac" not in row or
6880
            "ip" not in row or
6881
            "bridge" not in row):
6882
          raise errors.OpPrereqError("Invalid contents of the"
6883
                                     " 'nics' parameter")
6884
      if not isinstance(self.op.disks, list):
6885
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6886
      for row in self.op.disks:
6887
        if (not isinstance(row, dict) or
6888
            "size" not in row or
6889
            not isinstance(row["size"], int) or
6890
            "mode" not in row or
6891
            row["mode"] not in ['r', 'w']):
6892
          raise errors.OpPrereqError("Invalid contents of the"
6893
                                     " 'disks' parameter")
6894
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6895
        self.op.hypervisor = self.cfg.GetHypervisorType()
6896
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6897
      if not hasattr(self.op, "name"):
6898
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6899
      fname = self.cfg.ExpandInstanceName(self.op.name)
6900
      if fname is None:
6901
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6902
                                   self.op.name)
6903
      self.op.name = fname
6904
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6905
    else:
6906
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6907
                                 self.op.mode)
6908

    
6909
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6910
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6911
        raise errors.OpPrereqError("Missing allocator name")
6912
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6913
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6914
                                 self.op.direction)
6915

    
6916
  def Exec(self, feedback_fn):
6917
    """Run the allocator test.
6918

6919
    """
6920
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6921
      ial = IAllocator(self,
6922
                       mode=self.op.mode,
6923
                       name=self.op.name,
6924
                       mem_size=self.op.mem_size,
6925
                       disks=self.op.disks,
6926
                       disk_template=self.op.disk_template,
6927
                       os=self.op.os,
6928
                       tags=self.op.tags,
6929
                       nics=self.op.nics,
6930
                       vcpus=self.op.vcpus,
6931
                       hypervisor=self.op.hypervisor,
6932
                       )
6933
    else:
6934
      ial = IAllocator(self,
6935
                       mode=self.op.mode,
6936
                       name=self.op.name,
6937
                       relocate_from=list(self.relocate_from),
6938
                       )
6939

    
6940
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
6941
      result = ial.in_text
6942
    else:
6943
      ial.Run(self.op.allocator, validate=False)
6944
      result = ial.out_text
6945
    return result