Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ e1b8653f

History | View | Annotate | Download (244 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import tempfile
30
import re
31
import platform
32
import logging
33
import copy
34
import random
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395
  return wanted
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _CheckBooleanOpField(op, name):
418
  """Validates boolean opcode parameters.
419

420
  This will ensure that an opcode parameter is either a boolean value,
421
  or None (but that it always exists).
422

423
  """
424
  val = getattr(op, name, None)
425
  if not (val is None or isinstance(val, bool)):
426
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427
                               (name, str(val)))
428
  setattr(op, name, val)
429

    
430

    
431
def _CheckNodeOnline(lu, node):
432
  """Ensure that a given node is online.
433

434
  @param lu: the LU on behalf of which we make the check
435
  @param node: the node to check
436
  @raise errors.OpPrereqError: if the node is offline
437

438
  """
439
  if lu.cfg.GetNodeInfo(node).offline:
440
    raise errors.OpPrereqError("Can't use offline node %s" % node)
441

    
442

    
443
def _CheckNodeNotDrained(lu, node):
444
  """Ensure that a given node is not drained.
445

446
  @param lu: the LU on behalf of which we make the check
447
  @param node: the node to check
448
  @raise errors.OpPrereqError: if the node is drained
449

450
  """
451
  if lu.cfg.GetNodeInfo(node).drained:
452
    raise errors.OpPrereqError("Can't use drained node %s" % node)
453

    
454

    
455
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456
                          memory, vcpus, nics, disk_template, disks):
457
  """Builds instance related env variables for hooks
458

459
  This builds the hook environment from individual variables.
460

461
  @type name: string
462
  @param name: the name of the instance
463
  @type primary_node: string
464
  @param primary_node: the name of the instance's primary node
465
  @type secondary_nodes: list
466
  @param secondary_nodes: list of secondary nodes as strings
467
  @type os_type: string
468
  @param os_type: the name of the instance's OS
469
  @type status: boolean
470
  @param status: the should_run status of the instance
471
  @type memory: string
472
  @param memory: the memory size of the instance
473
  @type vcpus: string
474
  @param vcpus: the count of VCPUs the instance has
475
  @type nics: list
476
  @param nics: list of tuples (ip, bridge, mac) representing
477
      the NICs the instance  has
478
  @type disk_template: string
479
  @param disk_template: the distk template of the instance
480
  @type disks: list
481
  @param disks: the list of (size, mode) pairs
482
  @rtype: dict
483
  @return: the hook environment for this instance
484

485
  """
486
  if status:
487
    str_status = "up"
488
  else:
489
    str_status = "down"
490
  env = {
491
    "OP_TARGET": name,
492
    "INSTANCE_NAME": name,
493
    "INSTANCE_PRIMARY": primary_node,
494
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495
    "INSTANCE_OS_TYPE": os_type,
496
    "INSTANCE_STATUS": str_status,
497
    "INSTANCE_MEMORY": memory,
498
    "INSTANCE_VCPUS": vcpus,
499
    "INSTANCE_DISK_TEMPLATE": disk_template,
500
  }
501

    
502
  if nics:
503
    nic_count = len(nics)
504
    for idx, (ip, bridge, mac) in enumerate(nics):
505
      if ip is None:
506
        ip = ""
507
      env["INSTANCE_NIC%d_IP" % idx] = ip
508
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509
      env["INSTANCE_NIC%d_MAC" % idx] = mac
510
  else:
511
    nic_count = 0
512

    
513
  env["INSTANCE_NIC_COUNT"] = nic_count
514

    
515
  if disks:
516
    disk_count = len(disks)
517
    for idx, (size, mode) in enumerate(disks):
518
      env["INSTANCE_DISK%d_SIZE" % idx] = size
519
      env["INSTANCE_DISK%d_MODE" % idx] = mode
520
  else:
521
    disk_count = 0
522

    
523
  env["INSTANCE_DISK_COUNT"] = disk_count
524

    
525
  return env
526

    
527

    
528
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529
  """Builds instance related env variables for hooks from an object.
530

531
  @type lu: L{LogicalUnit}
532
  @param lu: the logical unit on whose behalf we execute
533
  @type instance: L{objects.Instance}
534
  @param instance: the instance for which we should build the
535
      environment
536
  @type override: dict
537
  @param override: dictionary with key/values that will override
538
      our values
539
  @rtype: dict
540
  @return: the hook environment dictionary
541

542
  """
543
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
544
  args = {
545
    'name': instance.name,
546
    'primary_node': instance.primary_node,
547
    'secondary_nodes': instance.secondary_nodes,
548
    'os_type': instance.os,
549
    'status': instance.admin_up,
550
    'memory': bep[constants.BE_MEMORY],
551
    'vcpus': bep[constants.BE_VCPUS],
552
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553
    'disk_template': instance.disk_template,
554
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
555
  }
556
  if override:
557
    args.update(override)
558
  return _BuildInstanceHookEnv(**args)
559

    
560

    
561
def _AdjustCandidatePool(lu):
562
  """Adjust the candidate pool after node operations.
563

564
  """
565
  mod_list = lu.cfg.MaintainCandidatePool()
566
  if mod_list:
567
    lu.LogInfo("Promoted nodes to master candidate role: %s",
568
               ", ".join(node.name for node in mod_list))
569
    for name in mod_list:
570
      lu.context.ReaddNode(name)
571
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572
  if mc_now > mc_max:
573
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574
               (mc_now, mc_max))
575

    
576

    
577
def _CheckInstanceBridgesExist(lu, instance):
578
  """Check that the brigdes needed by an instance exist.
579

580
  """
581
  # check bridges existance
582
  brlist = [nic.bridge for nic in instance.nics]
583
  result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584
  result.Raise()
585
  if not result.data:
586
    raise errors.OpPrereqError("One or more target bridges %s does not"
587
                               " exist on destination node '%s'" %
588
                               (brlist, instance.primary_node))
589

    
590

    
591
class LUDestroyCluster(NoHooksLU):
592
  """Logical unit for destroying the cluster.
593

594
  """
595
  _OP_REQP = []
596

    
597
  def CheckPrereq(self):
598
    """Check prerequisites.
599

600
    This checks whether the cluster is empty.
601

602
    Any errors are signalled by raising errors.OpPrereqError.
603

604
    """
605
    master = self.cfg.GetMasterNode()
606

    
607
    nodelist = self.cfg.GetNodeList()
608
    if len(nodelist) != 1 or nodelist[0] != master:
609
      raise errors.OpPrereqError("There are still %d node(s) in"
610
                                 " this cluster." % (len(nodelist) - 1))
611
    instancelist = self.cfg.GetInstanceList()
612
    if instancelist:
613
      raise errors.OpPrereqError("There are still %d instance(s) in"
614
                                 " this cluster." % len(instancelist))
615

    
616
  def Exec(self, feedback_fn):
617
    """Destroys the cluster.
618

619
    """
620
    master = self.cfg.GetMasterNode()
621
    result = self.rpc.call_node_stop_master(master, False)
622
    result.Raise()
623
    if not result.data:
624
      raise errors.OpExecError("Could not disable the master role")
625
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626
    utils.CreateBackup(priv_key)
627
    utils.CreateBackup(pub_key)
628
    return master
629

    
630

    
631
class LUVerifyCluster(LogicalUnit):
632
  """Verifies the cluster status.
633

634
  """
635
  HPATH = "cluster-verify"
636
  HTYPE = constants.HTYPE_CLUSTER
637
  _OP_REQP = ["skip_checks"]
638
  REQ_BGL = False
639

    
640
  def ExpandNames(self):
641
    self.needed_locks = {
642
      locking.LEVEL_NODE: locking.ALL_SET,
643
      locking.LEVEL_INSTANCE: locking.ALL_SET,
644
    }
645
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646

    
647
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648
                  node_result, feedback_fn, master_files,
649
                  drbd_map, vg_name):
650
    """Run multiple tests against a node.
651

652
    Test list:
653

654
      - compares ganeti version
655
      - checks vg existance and size > 20G
656
      - checks config file checksum
657
      - checks ssh to other nodes
658

659
    @type nodeinfo: L{objects.Node}
660
    @param nodeinfo: the node to check
661
    @param file_list: required list of files
662
    @param local_cksum: dictionary of local files and their checksums
663
    @param node_result: the results from the node
664
    @param feedback_fn: function used to accumulate results
665
    @param master_files: list of files that only masters should have
666
    @param drbd_map: the useddrbd minors for this node, in
667
        form of minor: (instance, must_exist) which correspond to instances
668
        and their running status
669
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670

671
    """
672
    node = nodeinfo.name
673

    
674
    # main result, node_result should be a non-empty dict
675
    if not node_result or not isinstance(node_result, dict):
676
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677
      return True
678

    
679
    # compares ganeti version
680
    local_version = constants.PROTOCOL_VERSION
681
    remote_version = node_result.get('version', None)
682
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
683
            len(remote_version) == 2):
684
      feedback_fn("  - ERROR: connection to %s failed" % (node))
685
      return True
686

    
687
    if local_version != remote_version[0]:
688
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689
                  " node %s %s" % (local_version, node, remote_version[0]))
690
      return True
691

    
692
    # node seems compatible, we can actually try to look into its results
693

    
694
    bad = False
695

    
696
    # full package version
697
    if constants.RELEASE_VERSION != remote_version[1]:
698
      feedback_fn("  - WARNING: software version mismatch: master %s,"
699
                  " node %s %s" %
700
                  (constants.RELEASE_VERSION, node, remote_version[1]))
701

    
702
    # checks vg existence and size > 20G
703
    if vg_name is not None:
704
      vglist = node_result.get(constants.NV_VGLIST, None)
705
      if not vglist:
706
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707
                        (node,))
708
        bad = True
709
      else:
710
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711
                                              constants.MIN_VG_SIZE)
712
        if vgstatus:
713
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714
          bad = True
715

    
716
    # checks config file checksum
717

    
718
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
719
    if not isinstance(remote_cksum, dict):
720
      bad = True
721
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
722
    else:
723
      for file_name in file_list:
724
        node_is_mc = nodeinfo.master_candidate
725
        must_have_file = file_name not in master_files
726
        if file_name not in remote_cksum:
727
          if node_is_mc or must_have_file:
728
            bad = True
729
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
730
        elif remote_cksum[file_name] != local_cksum[file_name]:
731
          if node_is_mc or must_have_file:
732
            bad = True
733
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734
          else:
735
            # not candidate and this is not a must-have file
736
            bad = True
737
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738
                        " '%s'" % file_name)
739
        else:
740
          # all good, except non-master/non-must have combination
741
          if not node_is_mc and not must_have_file:
742
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
743
                        " candidates" % file_name)
744

    
745
    # checks ssh to any
746

    
747
    if constants.NV_NODELIST not in node_result:
748
      bad = True
749
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750
    else:
751
      if node_result[constants.NV_NODELIST]:
752
        bad = True
753
        for node in node_result[constants.NV_NODELIST]:
754
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755
                          (node, node_result[constants.NV_NODELIST][node]))
756

    
757
    if constants.NV_NODENETTEST not in node_result:
758
      bad = True
759
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760
    else:
761
      if node_result[constants.NV_NODENETTEST]:
762
        bad = True
763
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764
        for node in nlist:
765
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766
                          (node, node_result[constants.NV_NODENETTEST][node]))
767

    
768
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769
    if isinstance(hyp_result, dict):
770
      for hv_name, hv_result in hyp_result.iteritems():
771
        if hv_result is not None:
772
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773
                      (hv_name, hv_result))
774

    
775
    # check used drbd list
776
    if vg_name is not None:
777
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
778
      if not isinstance(used_minors, (tuple, list)):
779
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780
                    str(used_minors))
781
      else:
782
        for minor, (iname, must_exist) in drbd_map.items():
783
          if minor not in used_minors and must_exist:
784
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785
                        " not active" % (minor, iname))
786
            bad = True
787
        for minor in used_minors:
788
          if minor not in drbd_map:
789
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790
                        minor)
791
            bad = True
792

    
793
    return bad
794

    
795
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796
                      node_instance, feedback_fn, n_offline):
797
    """Verify an instance.
798

799
    This function checks to see if the required block devices are
800
    available on the instance's node.
801

802
    """
803
    bad = False
804

    
805
    node_current = instanceconfig.primary_node
806

    
807
    node_vol_should = {}
808
    instanceconfig.MapLVsByNode(node_vol_should)
809

    
810
    for node in node_vol_should:
811
      if node in n_offline:
812
        # ignore missing volumes on offline nodes
813
        continue
814
      for volume in node_vol_should[node]:
815
        if node not in node_vol_is or volume not in node_vol_is[node]:
816
          feedback_fn("  - ERROR: volume %s missing on node %s" %
817
                          (volume, node))
818
          bad = True
819

    
820
    if instanceconfig.admin_up:
821
      if ((node_current not in node_instance or
822
          not instance in node_instance[node_current]) and
823
          node_current not in n_offline):
824
        feedback_fn("  - ERROR: instance %s not running on node %s" %
825
                        (instance, node_current))
826
        bad = True
827

    
828
    for node in node_instance:
829
      if (not node == node_current):
830
        if instance in node_instance[node]:
831
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
832
                          (instance, node))
833
          bad = True
834

    
835
    return bad
836

    
837
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838
    """Verify if there are any unknown volumes in the cluster.
839

840
    The .os, .swap and backup volumes are ignored. All other volumes are
841
    reported as unknown.
842

843
    """
844
    bad = False
845

    
846
    for node in node_vol_is:
847
      for volume in node_vol_is[node]:
848
        if node not in node_vol_should or volume not in node_vol_should[node]:
849
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850
                      (volume, node))
851
          bad = True
852
    return bad
853

    
854
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855
    """Verify the list of running instances.
856

857
    This checks what instances are running but unknown to the cluster.
858

859
    """
860
    bad = False
861
    for node in node_instance:
862
      for runninginstance in node_instance[node]:
863
        if runninginstance not in instancelist:
864
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865
                          (runninginstance, node))
866
          bad = True
867
    return bad
868

    
869
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870
    """Verify N+1 Memory Resilience.
871

872
    Check that if one single node dies we can still start all the instances it
873
    was primary for.
874

875
    """
876
    bad = False
877

    
878
    for node, nodeinfo in node_info.iteritems():
879
      # This code checks that every node which is now listed as secondary has
880
      # enough memory to host all instances it is supposed to should a single
881
      # other node in the cluster fail.
882
      # FIXME: not ready for failover to an arbitrary node
883
      # FIXME: does not support file-backed instances
884
      # WARNING: we currently take into account down instances as well as up
885
      # ones, considering that even if they're down someone might want to start
886
      # them even in the event of a node failure.
887
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888
        needed_mem = 0
889
        for instance in instances:
890
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891
          if bep[constants.BE_AUTO_BALANCE]:
892
            needed_mem += bep[constants.BE_MEMORY]
893
        if nodeinfo['mfree'] < needed_mem:
894
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895
                      " failovers should node %s fail" % (node, prinode))
896
          bad = True
897
    return bad
898

    
899
  def CheckPrereq(self):
900
    """Check prerequisites.
901

902
    Transform the list of checks we're going to skip into a set and check that
903
    all its members are valid.
904

905
    """
906
    self.skip_set = frozenset(self.op.skip_checks)
907
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
909

    
910
  def BuildHooksEnv(self):
911
    """Build hooks env.
912

913
    Cluster-Verify hooks just rone in the post phase and their failure makes
914
    the output be logged in the verify output and the verification to fail.
915

916
    """
917
    all_nodes = self.cfg.GetNodeList()
918
    env = {
919
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920
      }
921
    for node in self.cfg.GetAllNodesInfo().values():
922
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923

    
924
    return env, [], all_nodes
925

    
926
  def Exec(self, feedback_fn):
927
    """Verify integrity of cluster, performing various test on nodes.
928

929
    """
930
    bad = False
931
    feedback_fn("* Verifying global settings")
932
    for msg in self.cfg.VerifyConfig():
933
      feedback_fn("  - ERROR: %s" % msg)
934

    
935
    vg_name = self.cfg.GetVGName()
936
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
938
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941
                        for iname in instancelist)
942
    i_non_redundant = [] # Non redundant instances
943
    i_non_a_balanced = [] # Non auto-balanced instances
944
    n_offline = [] # List of offline nodes
945
    n_drained = [] # List of nodes being drained
946
    node_volume = {}
947
    node_instance = {}
948
    node_info = {}
949
    instance_cfg = {}
950

    
951
    # FIXME: verify OS list
952
    # do local checksums
953
    master_files = [constants.CLUSTER_CONF_FILE]
954

    
955
    file_names = ssconf.SimpleStore().GetFileList()
956
    file_names.append(constants.SSL_CERT_FILE)
957
    file_names.append(constants.RAPI_CERT_FILE)
958
    file_names.extend(master_files)
959

    
960
    local_checksums = utils.FingerprintFiles(file_names)
961

    
962
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963
    node_verify_param = {
964
      constants.NV_FILELIST: file_names,
965
      constants.NV_NODELIST: [node.name for node in nodeinfo
966
                              if not node.offline],
967
      constants.NV_HYPERVISOR: hypervisors,
968
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969
                                  node.secondary_ip) for node in nodeinfo
970
                                 if not node.offline],
971
      constants.NV_INSTANCELIST: hypervisors,
972
      constants.NV_VERSION: None,
973
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974
      }
975
    if vg_name is not None:
976
      node_verify_param[constants.NV_VGLIST] = None
977
      node_verify_param[constants.NV_LVLIST] = vg_name
978
      node_verify_param[constants.NV_DRBDLIST] = None
979
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980
                                           self.cfg.GetClusterName())
981

    
982
    cluster = self.cfg.GetClusterInfo()
983
    master_node = self.cfg.GetMasterNode()
984
    all_drbd_map = self.cfg.ComputeDRBDMap()
985

    
986
    for node_i in nodeinfo:
987
      node = node_i.name
988
      nresult = all_nvinfo[node].data
989

    
990
      if node_i.offline:
991
        feedback_fn("* Skipping offline node %s" % (node,))
992
        n_offline.append(node)
993
        continue
994

    
995
      if node == master_node:
996
        ntype = "master"
997
      elif node_i.master_candidate:
998
        ntype = "master candidate"
999
      elif node_i.drained:
1000
        ntype = "drained"
1001
        n_drained.append(node)
1002
      else:
1003
        ntype = "regular"
1004
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005

    
1006
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008
        bad = True
1009
        continue
1010

    
1011
      node_drbd = {}
1012
      for minor, instance in all_drbd_map[node].items():
1013
        if instance not in instanceinfo:
1014
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015
                      instance)
1016
          # ghost instance should not be running, but otherwise we
1017
          # don't give double warnings (both ghost instance and
1018
          # unallocated minor in use)
1019
          node_drbd[minor] = (instance, False)
1020
        else:
1021
          instance = instanceinfo[instance]
1022
          node_drbd[minor] = (instance.name, instance.admin_up)
1023
      result = self._VerifyNode(node_i, file_names, local_checksums,
1024
                                nresult, feedback_fn, master_files,
1025
                                node_drbd, vg_name)
1026
      bad = bad or result
1027

    
1028
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029
      if vg_name is None:
1030
        node_volume[node] = {}
1031
      elif isinstance(lvdata, basestring):
1032
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033
                    (node, utils.SafeEncode(lvdata)))
1034
        bad = True
1035
        node_volume[node] = {}
1036
      elif not isinstance(lvdata, dict):
1037
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038
        bad = True
1039
        continue
1040
      else:
1041
        node_volume[node] = lvdata
1042

    
1043
      # node_instance
1044
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1045
      if not isinstance(idata, list):
1046
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047
                    (node,))
1048
        bad = True
1049
        continue
1050

    
1051
      node_instance[node] = idata
1052

    
1053
      # node_info
1054
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055
      if not isinstance(nodeinfo, dict):
1056
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057
        bad = True
1058
        continue
1059

    
1060
      try:
1061
        node_info[node] = {
1062
          "mfree": int(nodeinfo['memory_free']),
1063
          "pinst": [],
1064
          "sinst": [],
1065
          # dictionary holding all instances this node is secondary for,
1066
          # grouped by their primary node. Each key is a cluster node, and each
1067
          # value is a list of instances which have the key as primary and the
1068
          # current node as secondary.  this is handy to calculate N+1 memory
1069
          # availability if you can only failover from a primary to its
1070
          # secondary.
1071
          "sinst-by-pnode": {},
1072
        }
1073
        # FIXME: devise a free space model for file based instances as well
1074
        if vg_name is not None:
1075
          if (constants.NV_VGLIST not in nresult or
1076
              vg_name not in nresult[constants.NV_VGLIST]):
1077
            feedback_fn("  - ERROR: node %s didn't return data for the"
1078
                        " volume group '%s' - it is either missing or broken" %
1079
                        (node, vg_name))
1080
            bad = True
1081
            continue
1082
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083
      except (ValueError, KeyError):
1084
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085
                    " from node %s" % (node,))
1086
        bad = True
1087
        continue
1088

    
1089
    node_vol_should = {}
1090

    
1091
    for instance in instancelist:
1092
      feedback_fn("* Verifying instance %s" % instance)
1093
      inst_config = instanceinfo[instance]
1094
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1095
                                     node_instance, feedback_fn, n_offline)
1096
      bad = bad or result
1097
      inst_nodes_offline = []
1098

    
1099
      inst_config.MapLVsByNode(node_vol_should)
1100

    
1101
      instance_cfg[instance] = inst_config
1102

    
1103
      pnode = inst_config.primary_node
1104
      if pnode in node_info:
1105
        node_info[pnode]['pinst'].append(instance)
1106
      elif pnode not in n_offline:
1107
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1108
                    " %s failed" % (instance, pnode))
1109
        bad = True
1110

    
1111
      if pnode in n_offline:
1112
        inst_nodes_offline.append(pnode)
1113

    
1114
      # If the instance is non-redundant we cannot survive losing its primary
1115
      # node, so we are not N+1 compliant. On the other hand we have no disk
1116
      # templates with more than one secondary so that situation is not well
1117
      # supported either.
1118
      # FIXME: does not support file-backed instances
1119
      if len(inst_config.secondary_nodes) == 0:
1120
        i_non_redundant.append(instance)
1121
      elif len(inst_config.secondary_nodes) > 1:
1122
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123
                    % instance)
1124

    
1125
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126
        i_non_a_balanced.append(instance)
1127

    
1128
      for snode in inst_config.secondary_nodes:
1129
        if snode in node_info:
1130
          node_info[snode]['sinst'].append(instance)
1131
          if pnode not in node_info[snode]['sinst-by-pnode']:
1132
            node_info[snode]['sinst-by-pnode'][pnode] = []
1133
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134
        elif snode not in n_offline:
1135
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136
                      " %s failed" % (instance, snode))
1137
          bad = True
1138
        if snode in n_offline:
1139
          inst_nodes_offline.append(snode)
1140

    
1141
      if inst_nodes_offline:
1142
        # warn that the instance lives on offline nodes, and set bad=True
1143
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144
                    ", ".join(inst_nodes_offline))
1145
        bad = True
1146

    
1147
    feedback_fn("* Verifying orphan volumes")
1148
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149
                                       feedback_fn)
1150
    bad = bad or result
1151

    
1152
    feedback_fn("* Verifying remaining instances")
1153
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1154
                                         feedback_fn)
1155
    bad = bad or result
1156

    
1157
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158
      feedback_fn("* Verifying N+1 Memory redundancy")
1159
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160
      bad = bad or result
1161

    
1162
    feedback_fn("* Other Notes")
1163
    if i_non_redundant:
1164
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165
                  % len(i_non_redundant))
1166

    
1167
    if i_non_a_balanced:
1168
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169
                  % len(i_non_a_balanced))
1170

    
1171
    if n_offline:
1172
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173

    
1174
    if n_drained:
1175
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176

    
1177
    return not bad
1178

    
1179
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180
    """Analize the post-hooks' result
1181

1182
    This method analyses the hook result, handles it, and sends some
1183
    nicely-formatted feedback back to the user.
1184

1185
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187
    @param hooks_results: the results of the multi-node hooks rpc call
1188
    @param feedback_fn: function used send feedback back to the caller
1189
    @param lu_result: previous Exec result
1190
    @return: the new Exec result, based on the previous result
1191
        and hook results
1192

1193
    """
1194
    # We only really run POST phase hooks, and are only interested in
1195
    # their results
1196
    if phase == constants.HOOKS_PHASE_POST:
1197
      # Used to change hooks' output to proper indentation
1198
      indent_re = re.compile('^', re.M)
1199
      feedback_fn("* Hooks Results")
1200
      if not hooks_results:
1201
        feedback_fn("  - ERROR: general communication failure")
1202
        lu_result = 1
1203
      else:
1204
        for node_name in hooks_results:
1205
          show_node_header = True
1206
          res = hooks_results[node_name]
1207
          if res.failed or res.data is False or not isinstance(res.data, list):
1208
            if res.offline:
1209
              # no need to warn or set fail return value
1210
              continue
1211
            feedback_fn("    Communication failure in hooks execution")
1212
            lu_result = 1
1213
            continue
1214
          for script, hkr, output in res.data:
1215
            if hkr == constants.HKR_FAIL:
1216
              # The node header is only shown once, if there are
1217
              # failing hooks on that node
1218
              if show_node_header:
1219
                feedback_fn("  Node %s:" % node_name)
1220
                show_node_header = False
1221
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1222
              output = indent_re.sub('      ', output)
1223
              feedback_fn("%s" % output)
1224
              lu_result = 1
1225

    
1226
      return lu_result
1227

    
1228

    
1229
class LUVerifyDisks(NoHooksLU):
1230
  """Verifies the cluster disks status.
1231

1232
  """
1233
  _OP_REQP = []
1234
  REQ_BGL = False
1235

    
1236
  def ExpandNames(self):
1237
    self.needed_locks = {
1238
      locking.LEVEL_NODE: locking.ALL_SET,
1239
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1240
    }
1241
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242

    
1243
  def CheckPrereq(self):
1244
    """Check prerequisites.
1245

1246
    This has no prerequisites.
1247

1248
    """
1249
    pass
1250

    
1251
  def Exec(self, feedback_fn):
1252
    """Verify integrity of cluster disks.
1253

1254
    """
1255
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256

    
1257
    vg_name = self.cfg.GetVGName()
1258
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1259
    instances = [self.cfg.GetInstanceInfo(name)
1260
                 for name in self.cfg.GetInstanceList()]
1261

    
1262
    nv_dict = {}
1263
    for inst in instances:
1264
      inst_lvs = {}
1265
      if (not inst.admin_up or
1266
          inst.disk_template not in constants.DTS_NET_MIRROR):
1267
        continue
1268
      inst.MapLVsByNode(inst_lvs)
1269
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270
      for node, vol_list in inst_lvs.iteritems():
1271
        for vol in vol_list:
1272
          nv_dict[(node, vol)] = inst
1273

    
1274
    if not nv_dict:
1275
      return result
1276

    
1277
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278

    
1279
    to_act = set()
1280
    for node in nodes:
1281
      # node_volume
1282
      lvs = node_lvs[node]
1283
      if lvs.failed:
1284
        if not lvs.offline:
1285
          self.LogWarning("Connection to node %s failed: %s" %
1286
                          (node, lvs.data))
1287
        continue
1288
      lvs = lvs.data
1289
      if isinstance(lvs, basestring):
1290
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291
        res_nlvm[node] = lvs
1292
        continue
1293
      elif not isinstance(lvs, dict):
1294
        logging.warning("Connection to node %s failed or invalid data"
1295
                        " returned", node)
1296
        res_nodes.append(node)
1297
        continue
1298

    
1299
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300
        inst = nv_dict.pop((node, lv_name), None)
1301
        if (not lv_online and inst is not None
1302
            and inst.name not in res_instances):
1303
          res_instances.append(inst.name)
1304

    
1305
    # any leftover items in nv_dict are missing LVs, let's arrange the
1306
    # data better
1307
    for key, inst in nv_dict.iteritems():
1308
      if inst.name not in res_missing:
1309
        res_missing[inst.name] = []
1310
      res_missing[inst.name].append(key)
1311

    
1312
    return result
1313

    
1314

    
1315
class LURenameCluster(LogicalUnit):
1316
  """Rename the cluster.
1317

1318
  """
1319
  HPATH = "cluster-rename"
1320
  HTYPE = constants.HTYPE_CLUSTER
1321
  _OP_REQP = ["name"]
1322

    
1323
  def BuildHooksEnv(self):
1324
    """Build hooks env.
1325

1326
    """
1327
    env = {
1328
      "OP_TARGET": self.cfg.GetClusterName(),
1329
      "NEW_NAME": self.op.name,
1330
      }
1331
    mn = self.cfg.GetMasterNode()
1332
    return env, [mn], [mn]
1333

    
1334
  def CheckPrereq(self):
1335
    """Verify that the passed name is a valid one.
1336

1337
    """
1338
    hostname = utils.HostInfo(self.op.name)
1339

    
1340
    new_name = hostname.name
1341
    self.ip = new_ip = hostname.ip
1342
    old_name = self.cfg.GetClusterName()
1343
    old_ip = self.cfg.GetMasterIP()
1344
    if new_name == old_name and new_ip == old_ip:
1345
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346
                                 " cluster has changed")
1347
    if new_ip != old_ip:
1348
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350
                                   " reachable on the network. Aborting." %
1351
                                   new_ip)
1352

    
1353
    self.op.name = new_name
1354

    
1355
  def Exec(self, feedback_fn):
1356
    """Rename the cluster.
1357

1358
    """
1359
    clustername = self.op.name
1360
    ip = self.ip
1361

    
1362
    # shutdown the master IP
1363
    master = self.cfg.GetMasterNode()
1364
    result = self.rpc.call_node_stop_master(master, False)
1365
    if result.failed or not result.data:
1366
      raise errors.OpExecError("Could not disable the master role")
1367

    
1368
    try:
1369
      cluster = self.cfg.GetClusterInfo()
1370
      cluster.cluster_name = clustername
1371
      cluster.master_ip = ip
1372
      self.cfg.Update(cluster)
1373

    
1374
      # update the known hosts file
1375
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376
      node_list = self.cfg.GetNodeList()
1377
      try:
1378
        node_list.remove(master)
1379
      except ValueError:
1380
        pass
1381
      result = self.rpc.call_upload_file(node_list,
1382
                                         constants.SSH_KNOWN_HOSTS_FILE)
1383
      for to_node, to_result in result.iteritems():
1384
        if to_result.failed or not to_result.data:
1385
          logging.error("Copy of file %s to node %s failed",
1386
                        constants.SSH_KNOWN_HOSTS_FILE, to_node)
1387

    
1388
    finally:
1389
      result = self.rpc.call_node_start_master(master, False)
1390
      if result.failed or not result.data:
1391
        self.LogWarning("Could not re-enable the master role on"
1392
                        " the master, please restart manually.")
1393

    
1394

    
1395
def _RecursiveCheckIfLVMBased(disk):
1396
  """Check if the given disk or its children are lvm-based.
1397

1398
  @type disk: L{objects.Disk}
1399
  @param disk: the disk to check
1400
  @rtype: booleean
1401
  @return: boolean indicating whether a LD_LV dev_type was found or not
1402

1403
  """
1404
  if disk.children:
1405
    for chdisk in disk.children:
1406
      if _RecursiveCheckIfLVMBased(chdisk):
1407
        return True
1408
  return disk.dev_type == constants.LD_LV
1409

    
1410

    
1411
class LUSetClusterParams(LogicalUnit):
1412
  """Change the parameters of the cluster.
1413

1414
  """
1415
  HPATH = "cluster-modify"
1416
  HTYPE = constants.HTYPE_CLUSTER
1417
  _OP_REQP = []
1418
  REQ_BGL = False
1419

    
1420
  def CheckArguments(self):
1421
    """Check parameters
1422

1423
    """
1424
    if not hasattr(self.op, "candidate_pool_size"):
1425
      self.op.candidate_pool_size = None
1426
    if self.op.candidate_pool_size is not None:
1427
      try:
1428
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1429
      except (ValueError, TypeError), err:
1430
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1431
                                   str(err))
1432
      if self.op.candidate_pool_size < 1:
1433
        raise errors.OpPrereqError("At least one master candidate needed")
1434

    
1435
  def ExpandNames(self):
1436
    # FIXME: in the future maybe other cluster params won't require checking on
1437
    # all nodes to be modified.
1438
    self.needed_locks = {
1439
      locking.LEVEL_NODE: locking.ALL_SET,
1440
    }
1441
    self.share_locks[locking.LEVEL_NODE] = 1
1442

    
1443
  def BuildHooksEnv(self):
1444
    """Build hooks env.
1445

1446
    """
1447
    env = {
1448
      "OP_TARGET": self.cfg.GetClusterName(),
1449
      "NEW_VG_NAME": self.op.vg_name,
1450
      }
1451
    mn = self.cfg.GetMasterNode()
1452
    return env, [mn], [mn]
1453

    
1454
  def CheckPrereq(self):
1455
    """Check prerequisites.
1456

1457
    This checks whether the given params don't conflict and
1458
    if the given volume group is valid.
1459

1460
    """
1461
    if self.op.vg_name is not None and not self.op.vg_name:
1462
      instances = self.cfg.GetAllInstancesInfo().values()
1463
      for inst in instances:
1464
        for disk in inst.disks:
1465
          if _RecursiveCheckIfLVMBased(disk):
1466
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1467
                                       " lvm-based instances exist")
1468

    
1469
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1470

    
1471
    # if vg_name not None, checks given volume group on all nodes
1472
    if self.op.vg_name:
1473
      vglist = self.rpc.call_vg_list(node_list)
1474
      for node in node_list:
1475
        if vglist[node].failed:
1476
          # ignoring down node
1477
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1478
          continue
1479
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1480
                                              self.op.vg_name,
1481
                                              constants.MIN_VG_SIZE)
1482
        if vgstatus:
1483
          raise errors.OpPrereqError("Error on node '%s': %s" %
1484
                                     (node, vgstatus))
1485

    
1486
    self.cluster = cluster = self.cfg.GetClusterInfo()
1487
    # validate beparams changes
1488
    if self.op.beparams:
1489
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1490
      self.new_beparams = cluster.FillDict(
1491
        cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1492

    
1493
    # hypervisor list/parameters
1494
    self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1495
    if self.op.hvparams:
1496
      if not isinstance(self.op.hvparams, dict):
1497
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1498
      for hv_name, hv_dict in self.op.hvparams.items():
1499
        if hv_name not in self.new_hvparams:
1500
          self.new_hvparams[hv_name] = hv_dict
1501
        else:
1502
          self.new_hvparams[hv_name].update(hv_dict)
1503

    
1504
    if self.op.enabled_hypervisors is not None:
1505
      self.hv_list = self.op.enabled_hypervisors
1506
    else:
1507
      self.hv_list = cluster.enabled_hypervisors
1508

    
1509
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1510
      # either the enabled list has changed, or the parameters have, validate
1511
      for hv_name, hv_params in self.new_hvparams.items():
1512
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1513
            (self.op.enabled_hypervisors and
1514
             hv_name in self.op.enabled_hypervisors)):
1515
          # either this is a new hypervisor, or its parameters have changed
1516
          hv_class = hypervisor.GetHypervisor(hv_name)
1517
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1518
          hv_class.CheckParameterSyntax(hv_params)
1519
          _CheckHVParams(self, node_list, hv_name, hv_params)
1520

    
1521
  def Exec(self, feedback_fn):
1522
    """Change the parameters of the cluster.
1523

1524
    """
1525
    if self.op.vg_name is not None:
1526
      new_volume = self.op.vg_name
1527
      if not new_volume:
1528
        new_volume = None
1529
      if new_volume != self.cfg.GetVGName():
1530
        self.cfg.SetVGName(new_volume)
1531
      else:
1532
        feedback_fn("Cluster LVM configuration already in desired"
1533
                    " state, not changing")
1534
    if self.op.hvparams:
1535
      self.cluster.hvparams = self.new_hvparams
1536
    if self.op.enabled_hypervisors is not None:
1537
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1538
    if self.op.beparams:
1539
      self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1540
    if self.op.candidate_pool_size is not None:
1541
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1542

    
1543
    self.cfg.Update(self.cluster)
1544

    
1545
    # we want to update nodes after the cluster so that if any errors
1546
    # happen, we have recorded and saved the cluster info
1547
    if self.op.candidate_pool_size is not None:
1548
      _AdjustCandidatePool(self)
1549

    
1550

    
1551
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1552
  """Distribute additional files which are part of the cluster configuration.
1553

1554
  ConfigWriter takes care of distributing the config and ssconf files, but
1555
  there are more files which should be distributed to all nodes. This function
1556
  makes sure those are copied.
1557

1558
  @param lu: calling logical unit
1559
  @param additional_nodes: list of nodes not in the config to distribute to
1560

1561
  """
1562
  # 1. Gather target nodes
1563
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1564
  dist_nodes = lu.cfg.GetNodeList()
1565
  if additional_nodes is not None:
1566
    dist_nodes.extend(additional_nodes)
1567
  if myself.name in dist_nodes:
1568
    dist_nodes.remove(myself.name)
1569
  # 2. Gather files to distribute
1570
  dist_files = set([constants.ETC_HOSTS,
1571
                    constants.SSH_KNOWN_HOSTS_FILE,
1572
                    constants.RAPI_CERT_FILE,
1573
                    constants.RAPI_USERS_FILE,
1574
                   ])
1575

    
1576
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1577
  for hv_name in enabled_hypervisors:
1578
    hv_class = hypervisor.GetHypervisor(hv_name)
1579
    dist_files.update(hv_class.GetAncillaryFiles())
1580

    
1581
  # 3. Perform the files upload
1582
  for fname in dist_files:
1583
    if os.path.exists(fname):
1584
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1585
      for to_node, to_result in result.items():
1586
        if to_result.failed or not to_result.data:
1587
          logging.error("Copy of file %s to node %s failed", fname, to_node)
1588

    
1589

    
1590
class LURedistributeConfig(NoHooksLU):
1591
  """Force the redistribution of cluster configuration.
1592

1593
  This is a very simple LU.
1594

1595
  """
1596
  _OP_REQP = []
1597
  REQ_BGL = False
1598

    
1599
  def ExpandNames(self):
1600
    self.needed_locks = {
1601
      locking.LEVEL_NODE: locking.ALL_SET,
1602
    }
1603
    self.share_locks[locking.LEVEL_NODE] = 1
1604

    
1605
  def CheckPrereq(self):
1606
    """Check prerequisites.
1607

1608
    """
1609

    
1610
  def Exec(self, feedback_fn):
1611
    """Redistribute the configuration.
1612

1613
    """
1614
    self.cfg.Update(self.cfg.GetClusterInfo())
1615
    _RedistributeAncillaryFiles(self)
1616

    
1617

    
1618
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1619
  """Sleep and poll for an instance's disk to sync.
1620

1621
  """
1622
  if not instance.disks:
1623
    return True
1624

    
1625
  if not oneshot:
1626
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1627

    
1628
  node = instance.primary_node
1629

    
1630
  for dev in instance.disks:
1631
    lu.cfg.SetDiskID(dev, node)
1632

    
1633
  retries = 0
1634
  while True:
1635
    max_time = 0
1636
    done = True
1637
    cumul_degraded = False
1638
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1639
    if rstats.failed or not rstats.data:
1640
      lu.LogWarning("Can't get any data from node %s", node)
1641
      retries += 1
1642
      if retries >= 10:
1643
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1644
                                 " aborting." % node)
1645
      time.sleep(6)
1646
      continue
1647
    rstats = rstats.data
1648
    retries = 0
1649
    for i, mstat in enumerate(rstats):
1650
      if mstat is None:
1651
        lu.LogWarning("Can't compute data for node %s/%s",
1652
                           node, instance.disks[i].iv_name)
1653
        continue
1654
      # we ignore the ldisk parameter
1655
      perc_done, est_time, is_degraded, _ = mstat
1656
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1657
      if perc_done is not None:
1658
        done = False
1659
        if est_time is not None:
1660
          rem_time = "%d estimated seconds remaining" % est_time
1661
          max_time = est_time
1662
        else:
1663
          rem_time = "no time estimate"
1664
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1665
                        (instance.disks[i].iv_name, perc_done, rem_time))
1666
    if done or oneshot:
1667
      break
1668

    
1669
    time.sleep(min(60, max_time))
1670

    
1671
  if done:
1672
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1673
  return not cumul_degraded
1674

    
1675

    
1676
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1677
  """Check that mirrors are not degraded.
1678

1679
  The ldisk parameter, if True, will change the test from the
1680
  is_degraded attribute (which represents overall non-ok status for
1681
  the device(s)) to the ldisk (representing the local storage status).
1682

1683
  """
1684
  lu.cfg.SetDiskID(dev, node)
1685
  if ldisk:
1686
    idx = 6
1687
  else:
1688
    idx = 5
1689

    
1690
  result = True
1691
  if on_primary or dev.AssembleOnSecondary():
1692
    rstats = lu.rpc.call_blockdev_find(node, dev)
1693
    msg = rstats.RemoteFailMsg()
1694
    if msg:
1695
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1696
      result = False
1697
    elif not rstats.payload:
1698
      lu.LogWarning("Can't find disk on node %s", node)
1699
      result = False
1700
    else:
1701
      result = result and (not rstats.payload[idx])
1702
  if dev.children:
1703
    for child in dev.children:
1704
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1705

    
1706
  return result
1707

    
1708

    
1709
class LUDiagnoseOS(NoHooksLU):
1710
  """Logical unit for OS diagnose/query.
1711

1712
  """
1713
  _OP_REQP = ["output_fields", "names"]
1714
  REQ_BGL = False
1715
  _FIELDS_STATIC = utils.FieldSet()
1716
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1717

    
1718
  def ExpandNames(self):
1719
    if self.op.names:
1720
      raise errors.OpPrereqError("Selective OS query not supported")
1721

    
1722
    _CheckOutputFields(static=self._FIELDS_STATIC,
1723
                       dynamic=self._FIELDS_DYNAMIC,
1724
                       selected=self.op.output_fields)
1725

    
1726
    # Lock all nodes, in shared mode
1727
    # Temporary removal of locks, should be reverted later
1728
    # TODO: reintroduce locks when they are lighter-weight
1729
    self.needed_locks = {}
1730
    #self.share_locks[locking.LEVEL_NODE] = 1
1731
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1732

    
1733
  def CheckPrereq(self):
1734
    """Check prerequisites.
1735

1736
    """
1737

    
1738
  @staticmethod
1739
  def _DiagnoseByOS(node_list, rlist):
1740
    """Remaps a per-node return list into an a per-os per-node dictionary
1741

1742
    @param node_list: a list with the names of all nodes
1743
    @param rlist: a map with node names as keys and OS objects as values
1744

1745
    @rtype: dict
1746
    @return: a dictionary with osnames as keys and as value another map, with
1747
        nodes as keys and list of OS objects as values, eg::
1748

1749
          {"debian-etch": {"node1": [<object>,...],
1750
                           "node2": [<object>,]}
1751
          }
1752

1753
    """
1754
    all_os = {}
1755
    # we build here the list of nodes that didn't fail the RPC (at RPC
1756
    # level), so that nodes with a non-responding node daemon don't
1757
    # make all OSes invalid
1758
    good_nodes = [node_name for node_name in rlist
1759
                  if not rlist[node_name].failed]
1760
    for node_name, nr in rlist.iteritems():
1761
      if nr.failed or not nr.data:
1762
        continue
1763
      for os_obj in nr.data:
1764
        if os_obj.name not in all_os:
1765
          # build a list of nodes for this os containing empty lists
1766
          # for each node in node_list
1767
          all_os[os_obj.name] = {}
1768
          for nname in good_nodes:
1769
            all_os[os_obj.name][nname] = []
1770
        all_os[os_obj.name][node_name].append(os_obj)
1771
    return all_os
1772

    
1773
  def Exec(self, feedback_fn):
1774
    """Compute the list of OSes.
1775

1776
    """
1777
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1778
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1779
    if node_data == False:
1780
      raise errors.OpExecError("Can't gather the list of OSes")
1781
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1782
    output = []
1783
    for os_name, os_data in pol.iteritems():
1784
      row = []
1785
      for field in self.op.output_fields:
1786
        if field == "name":
1787
          val = os_name
1788
        elif field == "valid":
1789
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1790
        elif field == "node_status":
1791
          val = {}
1792
          for node_name, nos_list in os_data.iteritems():
1793
            val[node_name] = [(v.status, v.path) for v in nos_list]
1794
        else:
1795
          raise errors.ParameterError(field)
1796
        row.append(val)
1797
      output.append(row)
1798

    
1799
    return output
1800

    
1801

    
1802
class LURemoveNode(LogicalUnit):
1803
  """Logical unit for removing a node.
1804

1805
  """
1806
  HPATH = "node-remove"
1807
  HTYPE = constants.HTYPE_NODE
1808
  _OP_REQP = ["node_name"]
1809

    
1810
  def BuildHooksEnv(self):
1811
    """Build hooks env.
1812

1813
    This doesn't run on the target node in the pre phase as a failed
1814
    node would then be impossible to remove.
1815

1816
    """
1817
    env = {
1818
      "OP_TARGET": self.op.node_name,
1819
      "NODE_NAME": self.op.node_name,
1820
      }
1821
    all_nodes = self.cfg.GetNodeList()
1822
    all_nodes.remove(self.op.node_name)
1823
    return env, all_nodes, all_nodes
1824

    
1825
  def CheckPrereq(self):
1826
    """Check prerequisites.
1827

1828
    This checks:
1829
     - the node exists in the configuration
1830
     - it does not have primary or secondary instances
1831
     - it's not the master
1832

1833
    Any errors are signalled by raising errors.OpPrereqError.
1834

1835
    """
1836
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1837
    if node is None:
1838
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1839

    
1840
    instance_list = self.cfg.GetInstanceList()
1841

    
1842
    masternode = self.cfg.GetMasterNode()
1843
    if node.name == masternode:
1844
      raise errors.OpPrereqError("Node is the master node,"
1845
                                 " you need to failover first.")
1846

    
1847
    for instance_name in instance_list:
1848
      instance = self.cfg.GetInstanceInfo(instance_name)
1849
      if node.name in instance.all_nodes:
1850
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1851
                                   " please remove first." % instance_name)
1852
    self.op.node_name = node.name
1853
    self.node = node
1854

    
1855
  def Exec(self, feedback_fn):
1856
    """Removes the node from the cluster.
1857

1858
    """
1859
    node = self.node
1860
    logging.info("Stopping the node daemon and removing configs from node %s",
1861
                 node.name)
1862

    
1863
    self.context.RemoveNode(node.name)
1864

    
1865
    self.rpc.call_node_leave_cluster(node.name)
1866

    
1867
    # Promote nodes to master candidate as needed
1868
    _AdjustCandidatePool(self)
1869

    
1870

    
1871
class LUQueryNodes(NoHooksLU):
1872
  """Logical unit for querying nodes.
1873

1874
  """
1875
  _OP_REQP = ["output_fields", "names", "use_locking"]
1876
  REQ_BGL = False
1877
  _FIELDS_DYNAMIC = utils.FieldSet(
1878
    "dtotal", "dfree",
1879
    "mtotal", "mnode", "mfree",
1880
    "bootid",
1881
    "ctotal", "cnodes", "csockets",
1882
    )
1883

    
1884
  _FIELDS_STATIC = utils.FieldSet(
1885
    "name", "pinst_cnt", "sinst_cnt",
1886
    "pinst_list", "sinst_list",
1887
    "pip", "sip", "tags",
1888
    "serial_no",
1889
    "master_candidate",
1890
    "master",
1891
    "offline",
1892
    "drained",
1893
    )
1894

    
1895
  def ExpandNames(self):
1896
    _CheckOutputFields(static=self._FIELDS_STATIC,
1897
                       dynamic=self._FIELDS_DYNAMIC,
1898
                       selected=self.op.output_fields)
1899

    
1900
    self.needed_locks = {}
1901
    self.share_locks[locking.LEVEL_NODE] = 1
1902

    
1903
    if self.op.names:
1904
      self.wanted = _GetWantedNodes(self, self.op.names)
1905
    else:
1906
      self.wanted = locking.ALL_SET
1907

    
1908
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1909
    self.do_locking = self.do_node_query and self.op.use_locking
1910
    if self.do_locking:
1911
      # if we don't request only static fields, we need to lock the nodes
1912
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1913

    
1914

    
1915
  def CheckPrereq(self):
1916
    """Check prerequisites.
1917

1918
    """
1919
    # The validation of the node list is done in the _GetWantedNodes,
1920
    # if non empty, and if empty, there's no validation to do
1921
    pass
1922

    
1923
  def Exec(self, feedback_fn):
1924
    """Computes the list of nodes and their attributes.
1925

1926
    """
1927
    all_info = self.cfg.GetAllNodesInfo()
1928
    if self.do_locking:
1929
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1930
    elif self.wanted != locking.ALL_SET:
1931
      nodenames = self.wanted
1932
      missing = set(nodenames).difference(all_info.keys())
1933
      if missing:
1934
        raise errors.OpExecError(
1935
          "Some nodes were removed before retrieving their data: %s" % missing)
1936
    else:
1937
      nodenames = all_info.keys()
1938

    
1939
    nodenames = utils.NiceSort(nodenames)
1940
    nodelist = [all_info[name] for name in nodenames]
1941

    
1942
    # begin data gathering
1943

    
1944
    if self.do_node_query:
1945
      live_data = {}
1946
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1947
                                          self.cfg.GetHypervisorType())
1948
      for name in nodenames:
1949
        nodeinfo = node_data[name]
1950
        if not nodeinfo.failed and nodeinfo.data:
1951
          nodeinfo = nodeinfo.data
1952
          fn = utils.TryConvert
1953
          live_data[name] = {
1954
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1955
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1956
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1957
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1958
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1959
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1960
            "bootid": nodeinfo.get('bootid', None),
1961
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1962
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1963
            }
1964
        else:
1965
          live_data[name] = {}
1966
    else:
1967
      live_data = dict.fromkeys(nodenames, {})
1968

    
1969
    node_to_primary = dict([(name, set()) for name in nodenames])
1970
    node_to_secondary = dict([(name, set()) for name in nodenames])
1971

    
1972
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
1973
                             "sinst_cnt", "sinst_list"))
1974
    if inst_fields & frozenset(self.op.output_fields):
1975
      instancelist = self.cfg.GetInstanceList()
1976

    
1977
      for instance_name in instancelist:
1978
        inst = self.cfg.GetInstanceInfo(instance_name)
1979
        if inst.primary_node in node_to_primary:
1980
          node_to_primary[inst.primary_node].add(inst.name)
1981
        for secnode in inst.secondary_nodes:
1982
          if secnode in node_to_secondary:
1983
            node_to_secondary[secnode].add(inst.name)
1984

    
1985
    master_node = self.cfg.GetMasterNode()
1986

    
1987
    # end data gathering
1988

    
1989
    output = []
1990
    for node in nodelist:
1991
      node_output = []
1992
      for field in self.op.output_fields:
1993
        if field == "name":
1994
          val = node.name
1995
        elif field == "pinst_list":
1996
          val = list(node_to_primary[node.name])
1997
        elif field == "sinst_list":
1998
          val = list(node_to_secondary[node.name])
1999
        elif field == "pinst_cnt":
2000
          val = len(node_to_primary[node.name])
2001
        elif field == "sinst_cnt":
2002
          val = len(node_to_secondary[node.name])
2003
        elif field == "pip":
2004
          val = node.primary_ip
2005
        elif field == "sip":
2006
          val = node.secondary_ip
2007
        elif field == "tags":
2008
          val = list(node.GetTags())
2009
        elif field == "serial_no":
2010
          val = node.serial_no
2011
        elif field == "master_candidate":
2012
          val = node.master_candidate
2013
        elif field == "master":
2014
          val = node.name == master_node
2015
        elif field == "offline":
2016
          val = node.offline
2017
        elif field == "drained":
2018
          val = node.drained
2019
        elif self._FIELDS_DYNAMIC.Matches(field):
2020
          val = live_data[node.name].get(field, None)
2021
        else:
2022
          raise errors.ParameterError(field)
2023
        node_output.append(val)
2024
      output.append(node_output)
2025

    
2026
    return output
2027

    
2028

    
2029
class LUQueryNodeVolumes(NoHooksLU):
2030
  """Logical unit for getting volumes on node(s).
2031

2032
  """
2033
  _OP_REQP = ["nodes", "output_fields"]
2034
  REQ_BGL = False
2035
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2036
  _FIELDS_STATIC = utils.FieldSet("node")
2037

    
2038
  def ExpandNames(self):
2039
    _CheckOutputFields(static=self._FIELDS_STATIC,
2040
                       dynamic=self._FIELDS_DYNAMIC,
2041
                       selected=self.op.output_fields)
2042

    
2043
    self.needed_locks = {}
2044
    self.share_locks[locking.LEVEL_NODE] = 1
2045
    if not self.op.nodes:
2046
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2047
    else:
2048
      self.needed_locks[locking.LEVEL_NODE] = \
2049
        _GetWantedNodes(self, self.op.nodes)
2050

    
2051
  def CheckPrereq(self):
2052
    """Check prerequisites.
2053

2054
    This checks that the fields required are valid output fields.
2055

2056
    """
2057
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2058

    
2059
  def Exec(self, feedback_fn):
2060
    """Computes the list of nodes and their attributes.
2061

2062
    """
2063
    nodenames = self.nodes
2064
    volumes = self.rpc.call_node_volumes(nodenames)
2065

    
2066
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2067
             in self.cfg.GetInstanceList()]
2068

    
2069
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2070

    
2071
    output = []
2072
    for node in nodenames:
2073
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2074
        continue
2075

    
2076
      node_vols = volumes[node].data[:]
2077
      node_vols.sort(key=lambda vol: vol['dev'])
2078

    
2079
      for vol in node_vols:
2080
        node_output = []
2081
        for field in self.op.output_fields:
2082
          if field == "node":
2083
            val = node
2084
          elif field == "phys":
2085
            val = vol['dev']
2086
          elif field == "vg":
2087
            val = vol['vg']
2088
          elif field == "name":
2089
            val = vol['name']
2090
          elif field == "size":
2091
            val = int(float(vol['size']))
2092
          elif field == "instance":
2093
            for inst in ilist:
2094
              if node not in lv_by_node[inst]:
2095
                continue
2096
              if vol['name'] in lv_by_node[inst][node]:
2097
                val = inst.name
2098
                break
2099
            else:
2100
              val = '-'
2101
          else:
2102
            raise errors.ParameterError(field)
2103
          node_output.append(str(val))
2104

    
2105
        output.append(node_output)
2106

    
2107
    return output
2108

    
2109

    
2110
class LUAddNode(LogicalUnit):
2111
  """Logical unit for adding node to the cluster.
2112

2113
  """
2114
  HPATH = "node-add"
2115
  HTYPE = constants.HTYPE_NODE
2116
  _OP_REQP = ["node_name"]
2117

    
2118
  def BuildHooksEnv(self):
2119
    """Build hooks env.
2120

2121
    This will run on all nodes before, and on all nodes + the new node after.
2122

2123
    """
2124
    env = {
2125
      "OP_TARGET": self.op.node_name,
2126
      "NODE_NAME": self.op.node_name,
2127
      "NODE_PIP": self.op.primary_ip,
2128
      "NODE_SIP": self.op.secondary_ip,
2129
      }
2130
    nodes_0 = self.cfg.GetNodeList()
2131
    nodes_1 = nodes_0 + [self.op.node_name, ]
2132
    return env, nodes_0, nodes_1
2133

    
2134
  def CheckPrereq(self):
2135
    """Check prerequisites.
2136

2137
    This checks:
2138
     - the new node is not already in the config
2139
     - it is resolvable
2140
     - its parameters (single/dual homed) matches the cluster
2141

2142
    Any errors are signalled by raising errors.OpPrereqError.
2143

2144
    """
2145
    node_name = self.op.node_name
2146
    cfg = self.cfg
2147

    
2148
    dns_data = utils.HostInfo(node_name)
2149

    
2150
    node = dns_data.name
2151
    primary_ip = self.op.primary_ip = dns_data.ip
2152
    secondary_ip = getattr(self.op, "secondary_ip", None)
2153
    if secondary_ip is None:
2154
      secondary_ip = primary_ip
2155
    if not utils.IsValidIP(secondary_ip):
2156
      raise errors.OpPrereqError("Invalid secondary IP given")
2157
    self.op.secondary_ip = secondary_ip
2158

    
2159
    node_list = cfg.GetNodeList()
2160
    if not self.op.readd and node in node_list:
2161
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2162
                                 node)
2163
    elif self.op.readd and node not in node_list:
2164
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2165

    
2166
    for existing_node_name in node_list:
2167
      existing_node = cfg.GetNodeInfo(existing_node_name)
2168

    
2169
      if self.op.readd and node == existing_node_name:
2170
        if (existing_node.primary_ip != primary_ip or
2171
            existing_node.secondary_ip != secondary_ip):
2172
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2173
                                     " address configuration as before")
2174
        continue
2175

    
2176
      if (existing_node.primary_ip == primary_ip or
2177
          existing_node.secondary_ip == primary_ip or
2178
          existing_node.primary_ip == secondary_ip or
2179
          existing_node.secondary_ip == secondary_ip):
2180
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2181
                                   " existing node %s" % existing_node.name)
2182

    
2183
    # check that the type of the node (single versus dual homed) is the
2184
    # same as for the master
2185
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2186
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2187
    newbie_singlehomed = secondary_ip == primary_ip
2188
    if master_singlehomed != newbie_singlehomed:
2189
      if master_singlehomed:
2190
        raise errors.OpPrereqError("The master has no private ip but the"
2191
                                   " new node has one")
2192
      else:
2193
        raise errors.OpPrereqError("The master has a private ip but the"
2194
                                   " new node doesn't have one")
2195

    
2196
    # checks reachablity
2197
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2198
      raise errors.OpPrereqError("Node not reachable by ping")
2199

    
2200
    if not newbie_singlehomed:
2201
      # check reachability from my secondary ip to newbie's secondary ip
2202
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2203
                           source=myself.secondary_ip):
2204
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2205
                                   " based ping to noded port")
2206

    
2207
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2208
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2209
    master_candidate = mc_now < cp_size
2210

    
2211
    self.new_node = objects.Node(name=node,
2212
                                 primary_ip=primary_ip,
2213
                                 secondary_ip=secondary_ip,
2214
                                 master_candidate=master_candidate,
2215
                                 offline=False, drained=False)
2216

    
2217
  def Exec(self, feedback_fn):
2218
    """Adds the new node to the cluster.
2219

2220
    """
2221
    new_node = self.new_node
2222
    node = new_node.name
2223

    
2224
    # check connectivity
2225
    result = self.rpc.call_version([node])[node]
2226
    result.Raise()
2227
    if result.data:
2228
      if constants.PROTOCOL_VERSION == result.data:
2229
        logging.info("Communication to node %s fine, sw version %s match",
2230
                     node, result.data)
2231
      else:
2232
        raise errors.OpExecError("Version mismatch master version %s,"
2233
                                 " node version %s" %
2234
                                 (constants.PROTOCOL_VERSION, result.data))
2235
    else:
2236
      raise errors.OpExecError("Cannot get version from the new node")
2237

    
2238
    # setup ssh on node
2239
    logging.info("Copy ssh key to node %s", node)
2240
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2241
    keyarray = []
2242
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2243
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2244
                priv_key, pub_key]
2245

    
2246
    for i in keyfiles:
2247
      f = open(i, 'r')
2248
      try:
2249
        keyarray.append(f.read())
2250
      finally:
2251
        f.close()
2252

    
2253
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2254
                                    keyarray[2],
2255
                                    keyarray[3], keyarray[4], keyarray[5])
2256

    
2257
    msg = result.RemoteFailMsg()
2258
    if msg:
2259
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2260
                               " new node: %s" % msg)
2261

    
2262
    # Add node to our /etc/hosts, and add key to known_hosts
2263
    utils.AddHostToEtcHosts(new_node.name)
2264

    
2265
    if new_node.secondary_ip != new_node.primary_ip:
2266
      result = self.rpc.call_node_has_ip_address(new_node.name,
2267
                                                 new_node.secondary_ip)
2268
      if result.failed or not result.data:
2269
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2270
                                 " you gave (%s). Please fix and re-run this"
2271
                                 " command." % new_node.secondary_ip)
2272

    
2273
    node_verify_list = [self.cfg.GetMasterNode()]
2274
    node_verify_param = {
2275
      'nodelist': [node],
2276
      # TODO: do a node-net-test as well?
2277
    }
2278

    
2279
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2280
                                       self.cfg.GetClusterName())
2281
    for verifier in node_verify_list:
2282
      if result[verifier].failed or not result[verifier].data:
2283
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2284
                                 " for remote verification" % verifier)
2285
      if result[verifier].data['nodelist']:
2286
        for failed in result[verifier].data['nodelist']:
2287
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2288
                      (verifier, result[verifier].data['nodelist'][failed]))
2289
        raise errors.OpExecError("ssh/hostname verification failed.")
2290

    
2291
    if self.op.readd:
2292
      _RedistributeAncillaryFiles(self)
2293
      self.context.ReaddNode(new_node)
2294
    else:
2295
      _RedistributeAncillaryFiles(self, additional_nodes=node)
2296
      self.context.AddNode(new_node)
2297

    
2298

    
2299
class LUSetNodeParams(LogicalUnit):
2300
  """Modifies the parameters of a node.
2301

2302
  """
2303
  HPATH = "node-modify"
2304
  HTYPE = constants.HTYPE_NODE
2305
  _OP_REQP = ["node_name"]
2306
  REQ_BGL = False
2307

    
2308
  def CheckArguments(self):
2309
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2310
    if node_name is None:
2311
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2312
    self.op.node_name = node_name
2313
    _CheckBooleanOpField(self.op, 'master_candidate')
2314
    _CheckBooleanOpField(self.op, 'offline')
2315
    _CheckBooleanOpField(self.op, 'drained')
2316
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2317
    if all_mods.count(None) == 3:
2318
      raise errors.OpPrereqError("Please pass at least one modification")
2319
    if all_mods.count(True) > 1:
2320
      raise errors.OpPrereqError("Can't set the node into more than one"
2321
                                 " state at the same time")
2322

    
2323
  def ExpandNames(self):
2324
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2325

    
2326
  def BuildHooksEnv(self):
2327
    """Build hooks env.
2328

2329
    This runs on the master node.
2330

2331
    """
2332
    env = {
2333
      "OP_TARGET": self.op.node_name,
2334
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2335
      "OFFLINE": str(self.op.offline),
2336
      "DRAINED": str(self.op.drained),
2337
      }
2338
    nl = [self.cfg.GetMasterNode(),
2339
          self.op.node_name]
2340
    return env, nl, nl
2341

    
2342
  def CheckPrereq(self):
2343
    """Check prerequisites.
2344

2345
    This only checks the instance list against the existing names.
2346

2347
    """
2348
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2349

    
2350
    if ((self.op.master_candidate == False or self.op.offline == True or
2351
         self.op.drained == True) and node.master_candidate):
2352
      # we will demote the node from master_candidate
2353
      if self.op.node_name == self.cfg.GetMasterNode():
2354
        raise errors.OpPrereqError("The master node has to be a"
2355
                                   " master candidate, online and not drained")
2356
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2357
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2358
      if num_candidates <= cp_size:
2359
        msg = ("Not enough master candidates (desired"
2360
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2361
        if self.op.force:
2362
          self.LogWarning(msg)
2363
        else:
2364
          raise errors.OpPrereqError(msg)
2365

    
2366
    if (self.op.master_candidate == True and
2367
        ((node.offline and not self.op.offline == False) or
2368
         (node.drained and not self.op.drained == False))):
2369
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2370
                                 " to master_candidate" % node.name)
2371

    
2372
    return
2373

    
2374
  def Exec(self, feedback_fn):
2375
    """Modifies a node.
2376

2377
    """
2378
    node = self.node
2379

    
2380
    result = []
2381
    changed_mc = False
2382

    
2383
    if self.op.offline is not None:
2384
      node.offline = self.op.offline
2385
      result.append(("offline", str(self.op.offline)))
2386
      if self.op.offline == True:
2387
        if node.master_candidate:
2388
          node.master_candidate = False
2389
          changed_mc = True
2390
          result.append(("master_candidate", "auto-demotion due to offline"))
2391
        if node.drained:
2392
          node.drained = False
2393
          result.append(("drained", "clear drained status due to offline"))
2394

    
2395
    if self.op.master_candidate is not None:
2396
      node.master_candidate = self.op.master_candidate
2397
      changed_mc = True
2398
      result.append(("master_candidate", str(self.op.master_candidate)))
2399
      if self.op.master_candidate == False:
2400
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2401
        msg = rrc.RemoteFailMsg()
2402
        if msg:
2403
          self.LogWarning("Node failed to demote itself: %s" % msg)
2404

    
2405
    if self.op.drained is not None:
2406
      node.drained = self.op.drained
2407
      result.append(("drained", str(self.op.drained)))
2408
      if self.op.drained == True:
2409
        if node.master_candidate:
2410
          node.master_candidate = False
2411
          changed_mc = True
2412
          result.append(("master_candidate", "auto-demotion due to drain"))
2413
        if node.offline:
2414
          node.offline = False
2415
          result.append(("offline", "clear offline status due to drain"))
2416

    
2417
    # this will trigger configuration file update, if needed
2418
    self.cfg.Update(node)
2419
    # this will trigger job queue propagation or cleanup
2420
    if changed_mc:
2421
      self.context.ReaddNode(node)
2422

    
2423
    return result
2424

    
2425

    
2426
class LUQueryClusterInfo(NoHooksLU):
2427
  """Query cluster configuration.
2428

2429
  """
2430
  _OP_REQP = []
2431
  REQ_BGL = False
2432

    
2433
  def ExpandNames(self):
2434
    self.needed_locks = {}
2435

    
2436
  def CheckPrereq(self):
2437
    """No prerequsites needed for this LU.
2438

2439
    """
2440
    pass
2441

    
2442
  def Exec(self, feedback_fn):
2443
    """Return cluster config.
2444

2445
    """
2446
    cluster = self.cfg.GetClusterInfo()
2447
    result = {
2448
      "software_version": constants.RELEASE_VERSION,
2449
      "protocol_version": constants.PROTOCOL_VERSION,
2450
      "config_version": constants.CONFIG_VERSION,
2451
      "os_api_version": constants.OS_API_VERSION,
2452
      "export_version": constants.EXPORT_VERSION,
2453
      "architecture": (platform.architecture()[0], platform.machine()),
2454
      "name": cluster.cluster_name,
2455
      "master": cluster.master_node,
2456
      "default_hypervisor": cluster.default_hypervisor,
2457
      "enabled_hypervisors": cluster.enabled_hypervisors,
2458
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2459
                        for hypervisor in cluster.enabled_hypervisors]),
2460
      "beparams": cluster.beparams,
2461
      "candidate_pool_size": cluster.candidate_pool_size,
2462
      "default_bridge": cluster.default_bridge,
2463
      "master_netdev": cluster.master_netdev,
2464
      "volume_group_name": cluster.volume_group_name,
2465
      "file_storage_dir": cluster.file_storage_dir,
2466
      }
2467

    
2468
    return result
2469

    
2470

    
2471
class LUQueryConfigValues(NoHooksLU):
2472
  """Return configuration values.
2473

2474
  """
2475
  _OP_REQP = []
2476
  REQ_BGL = False
2477
  _FIELDS_DYNAMIC = utils.FieldSet()
2478
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2479

    
2480
  def ExpandNames(self):
2481
    self.needed_locks = {}
2482

    
2483
    _CheckOutputFields(static=self._FIELDS_STATIC,
2484
                       dynamic=self._FIELDS_DYNAMIC,
2485
                       selected=self.op.output_fields)
2486

    
2487
  def CheckPrereq(self):
2488
    """No prerequisites.
2489

2490
    """
2491
    pass
2492

    
2493
  def Exec(self, feedback_fn):
2494
    """Dump a representation of the cluster config to the standard output.
2495

2496
    """
2497
    values = []
2498
    for field in self.op.output_fields:
2499
      if field == "cluster_name":
2500
        entry = self.cfg.GetClusterName()
2501
      elif field == "master_node":
2502
        entry = self.cfg.GetMasterNode()
2503
      elif field == "drain_flag":
2504
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2505
      else:
2506
        raise errors.ParameterError(field)
2507
      values.append(entry)
2508
    return values
2509

    
2510

    
2511
class LUActivateInstanceDisks(NoHooksLU):
2512
  """Bring up an instance's disks.
2513

2514
  """
2515
  _OP_REQP = ["instance_name"]
2516
  REQ_BGL = False
2517

    
2518
  def ExpandNames(self):
2519
    self._ExpandAndLockInstance()
2520
    self.needed_locks[locking.LEVEL_NODE] = []
2521
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2522

    
2523
  def DeclareLocks(self, level):
2524
    if level == locking.LEVEL_NODE:
2525
      self._LockInstancesNodes()
2526

    
2527
  def CheckPrereq(self):
2528
    """Check prerequisites.
2529

2530
    This checks that the instance is in the cluster.
2531

2532
    """
2533
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2534
    assert self.instance is not None, \
2535
      "Cannot retrieve locked instance %s" % self.op.instance_name
2536
    _CheckNodeOnline(self, self.instance.primary_node)
2537

    
2538
  def Exec(self, feedback_fn):
2539
    """Activate the disks.
2540

2541
    """
2542
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2543
    if not disks_ok:
2544
      raise errors.OpExecError("Cannot activate block devices")
2545

    
2546
    return disks_info
2547

    
2548

    
2549
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2550
  """Prepare the block devices for an instance.
2551

2552
  This sets up the block devices on all nodes.
2553

2554
  @type lu: L{LogicalUnit}
2555
  @param lu: the logical unit on whose behalf we execute
2556
  @type instance: L{objects.Instance}
2557
  @param instance: the instance for whose disks we assemble
2558
  @type ignore_secondaries: boolean
2559
  @param ignore_secondaries: if true, errors on secondary nodes
2560
      won't result in an error return from the function
2561
  @return: False if the operation failed, otherwise a list of
2562
      (host, instance_visible_name, node_visible_name)
2563
      with the mapping from node devices to instance devices
2564

2565
  """
2566
  device_info = []
2567
  disks_ok = True
2568
  iname = instance.name
2569
  # With the two passes mechanism we try to reduce the window of
2570
  # opportunity for the race condition of switching DRBD to primary
2571
  # before handshaking occured, but we do not eliminate it
2572

    
2573
  # The proper fix would be to wait (with some limits) until the
2574
  # connection has been made and drbd transitions from WFConnection
2575
  # into any other network-connected state (Connected, SyncTarget,
2576
  # SyncSource, etc.)
2577

    
2578
  # 1st pass, assemble on all nodes in secondary mode
2579
  for inst_disk in instance.disks:
2580
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2581
      lu.cfg.SetDiskID(node_disk, node)
2582
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2583
      msg = result.RemoteFailMsg()
2584
      if msg:
2585
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2586
                           " (is_primary=False, pass=1): %s",
2587
                           inst_disk.iv_name, node, msg)
2588
        if not ignore_secondaries:
2589
          disks_ok = False
2590

    
2591
  # FIXME: race condition on drbd migration to primary
2592

    
2593
  # 2nd pass, do only the primary node
2594
  for inst_disk in instance.disks:
2595
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2596
      if node != instance.primary_node:
2597
        continue
2598
      lu.cfg.SetDiskID(node_disk, node)
2599
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2600
      msg = result.RemoteFailMsg()
2601
      if msg:
2602
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2603
                           " (is_primary=True, pass=2): %s",
2604
                           inst_disk.iv_name, node, msg)
2605
        disks_ok = False
2606
    device_info.append((instance.primary_node, inst_disk.iv_name,
2607
                        result.payload))
2608

    
2609
  # leave the disks configured for the primary node
2610
  # this is a workaround that would be fixed better by
2611
  # improving the logical/physical id handling
2612
  for disk in instance.disks:
2613
    lu.cfg.SetDiskID(disk, instance.primary_node)
2614

    
2615
  return disks_ok, device_info
2616

    
2617

    
2618
def _StartInstanceDisks(lu, instance, force):
2619
  """Start the disks of an instance.
2620

2621
  """
2622
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2623
                                           ignore_secondaries=force)
2624
  if not disks_ok:
2625
    _ShutdownInstanceDisks(lu, instance)
2626
    if force is not None and not force:
2627
      lu.proc.LogWarning("", hint="If the message above refers to a"
2628
                         " secondary node,"
2629
                         " you can retry the operation using '--force'.")
2630
    raise errors.OpExecError("Disk consistency error")
2631

    
2632

    
2633
class LUDeactivateInstanceDisks(NoHooksLU):
2634
  """Shutdown an instance's disks.
2635

2636
  """
2637
  _OP_REQP = ["instance_name"]
2638
  REQ_BGL = False
2639

    
2640
  def ExpandNames(self):
2641
    self._ExpandAndLockInstance()
2642
    self.needed_locks[locking.LEVEL_NODE] = []
2643
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2644

    
2645
  def DeclareLocks(self, level):
2646
    if level == locking.LEVEL_NODE:
2647
      self._LockInstancesNodes()
2648

    
2649
  def CheckPrereq(self):
2650
    """Check prerequisites.
2651

2652
    This checks that the instance is in the cluster.
2653

2654
    """
2655
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2656
    assert self.instance is not None, \
2657
      "Cannot retrieve locked instance %s" % self.op.instance_name
2658

    
2659
  def Exec(self, feedback_fn):
2660
    """Deactivate the disks
2661

2662
    """
2663
    instance = self.instance
2664
    _SafeShutdownInstanceDisks(self, instance)
2665

    
2666

    
2667
def _SafeShutdownInstanceDisks(lu, instance):
2668
  """Shutdown block devices of an instance.
2669

2670
  This function checks if an instance is running, before calling
2671
  _ShutdownInstanceDisks.
2672

2673
  """
2674
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2675
                                      [instance.hypervisor])
2676
  ins_l = ins_l[instance.primary_node]
2677
  if ins_l.failed or not isinstance(ins_l.data, list):
2678
    raise errors.OpExecError("Can't contact node '%s'" %
2679
                             instance.primary_node)
2680

    
2681
  if instance.name in ins_l.data:
2682
    raise errors.OpExecError("Instance is running, can't shutdown"
2683
                             " block devices.")
2684

    
2685
  _ShutdownInstanceDisks(lu, instance)
2686

    
2687

    
2688
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2689
  """Shutdown block devices of an instance.
2690

2691
  This does the shutdown on all nodes of the instance.
2692

2693
  If the ignore_primary is false, errors on the primary node are
2694
  ignored.
2695

2696
  """
2697
  all_result = True
2698
  for disk in instance.disks:
2699
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2700
      lu.cfg.SetDiskID(top_disk, node)
2701
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2702
      msg = result.RemoteFailMsg()
2703
      if msg:
2704
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2705
                      disk.iv_name, node, msg)
2706
        if not ignore_primary or node != instance.primary_node:
2707
          all_result = False
2708
  return all_result
2709

    
2710

    
2711
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2712
  """Checks if a node has enough free memory.
2713

2714
  This function check if a given node has the needed amount of free
2715
  memory. In case the node has less memory or we cannot get the
2716
  information from the node, this function raise an OpPrereqError
2717
  exception.
2718

2719
  @type lu: C{LogicalUnit}
2720
  @param lu: a logical unit from which we get configuration data
2721
  @type node: C{str}
2722
  @param node: the node to check
2723
  @type reason: C{str}
2724
  @param reason: string to use in the error message
2725
  @type requested: C{int}
2726
  @param requested: the amount of memory in MiB to check for
2727
  @type hypervisor_name: C{str}
2728
  @param hypervisor_name: the hypervisor to ask for memory stats
2729
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2730
      we cannot check the node
2731

2732
  """
2733
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2734
  nodeinfo[node].Raise()
2735
  free_mem = nodeinfo[node].data.get('memory_free')
2736
  if not isinstance(free_mem, int):
2737
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2738
                             " was '%s'" % (node, free_mem))
2739
  if requested > free_mem:
2740
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2741
                             " needed %s MiB, available %s MiB" %
2742
                             (node, reason, requested, free_mem))
2743

    
2744

    
2745
class LUStartupInstance(LogicalUnit):
2746
  """Starts an instance.
2747

2748
  """
2749
  HPATH = "instance-start"
2750
  HTYPE = constants.HTYPE_INSTANCE
2751
  _OP_REQP = ["instance_name", "force"]
2752
  REQ_BGL = False
2753

    
2754
  def ExpandNames(self):
2755
    self._ExpandAndLockInstance()
2756

    
2757
  def BuildHooksEnv(self):
2758
    """Build hooks env.
2759

2760
    This runs on master, primary and secondary nodes of the instance.
2761

2762
    """
2763
    env = {
2764
      "FORCE": self.op.force,
2765
      }
2766
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2767
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2768
    return env, nl, nl
2769

    
2770
  def CheckPrereq(self):
2771
    """Check prerequisites.
2772

2773
    This checks that the instance is in the cluster.
2774

2775
    """
2776
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2777
    assert self.instance is not None, \
2778
      "Cannot retrieve locked instance %s" % self.op.instance_name
2779

    
2780
    # extra beparams
2781
    self.beparams = getattr(self.op, "beparams", {})
2782
    if self.beparams:
2783
      if not isinstance(self.beparams, dict):
2784
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2785
                                   " dict" % (type(self.beparams), ))
2786
      # fill the beparams dict
2787
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2788
      self.op.beparams = self.beparams
2789

    
2790
    # extra hvparams
2791
    self.hvparams = getattr(self.op, "hvparams", {})
2792
    if self.hvparams:
2793
      if not isinstance(self.hvparams, dict):
2794
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2795
                                   " dict" % (type(self.hvparams), ))
2796

    
2797
      # check hypervisor parameter syntax (locally)
2798
      cluster = self.cfg.GetClusterInfo()
2799
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2800
      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2801
                                    instance.hvparams)
2802
      filled_hvp.update(self.hvparams)
2803
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2804
      hv_type.CheckParameterSyntax(filled_hvp)
2805
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2806
      self.op.hvparams = self.hvparams
2807

    
2808
    _CheckNodeOnline(self, instance.primary_node)
2809

    
2810
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2811
    # check bridges existance
2812
    _CheckInstanceBridgesExist(self, instance)
2813

    
2814
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2815
                                              instance.name,
2816
                                              instance.hypervisor)
2817
    remote_info.Raise()
2818
    if not remote_info.data:
2819
      _CheckNodeFreeMemory(self, instance.primary_node,
2820
                           "starting instance %s" % instance.name,
2821
                           bep[constants.BE_MEMORY], instance.hypervisor)
2822

    
2823
  def Exec(self, feedback_fn):
2824
    """Start the instance.
2825

2826
    """
2827
    instance = self.instance
2828
    force = self.op.force
2829

    
2830
    self.cfg.MarkInstanceUp(instance.name)
2831

    
2832
    node_current = instance.primary_node
2833

    
2834
    _StartInstanceDisks(self, instance, force)
2835

    
2836
    result = self.rpc.call_instance_start(node_current, instance,
2837
                                          self.hvparams, self.beparams)
2838
    msg = result.RemoteFailMsg()
2839
    if msg:
2840
      _ShutdownInstanceDisks(self, instance)
2841
      raise errors.OpExecError("Could not start instance: %s" % msg)
2842

    
2843

    
2844
class LURebootInstance(LogicalUnit):
2845
  """Reboot an instance.
2846

2847
  """
2848
  HPATH = "instance-reboot"
2849
  HTYPE = constants.HTYPE_INSTANCE
2850
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2851
  REQ_BGL = False
2852

    
2853
  def ExpandNames(self):
2854
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2855
                                   constants.INSTANCE_REBOOT_HARD,
2856
                                   constants.INSTANCE_REBOOT_FULL]:
2857
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2858
                                  (constants.INSTANCE_REBOOT_SOFT,
2859
                                   constants.INSTANCE_REBOOT_HARD,
2860
                                   constants.INSTANCE_REBOOT_FULL))
2861
    self._ExpandAndLockInstance()
2862

    
2863
  def BuildHooksEnv(self):
2864
    """Build hooks env.
2865

2866
    This runs on master, primary and secondary nodes of the instance.
2867

2868
    """
2869
    env = {
2870
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2871
      "REBOOT_TYPE": self.op.reboot_type,
2872
      }
2873
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2874
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2875
    return env, nl, nl
2876

    
2877
  def CheckPrereq(self):
2878
    """Check prerequisites.
2879

2880
    This checks that the instance is in the cluster.
2881

2882
    """
2883
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2884
    assert self.instance is not None, \
2885
      "Cannot retrieve locked instance %s" % self.op.instance_name
2886

    
2887
    _CheckNodeOnline(self, instance.primary_node)
2888

    
2889
    # check bridges existance
2890
    _CheckInstanceBridgesExist(self, instance)
2891

    
2892
  def Exec(self, feedback_fn):
2893
    """Reboot the instance.
2894

2895
    """
2896
    instance = self.instance
2897
    ignore_secondaries = self.op.ignore_secondaries
2898
    reboot_type = self.op.reboot_type
2899

    
2900
    node_current = instance.primary_node
2901

    
2902
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2903
                       constants.INSTANCE_REBOOT_HARD]:
2904
      for disk in instance.disks:
2905
        self.cfg.SetDiskID(disk, node_current)
2906
      result = self.rpc.call_instance_reboot(node_current, instance,
2907
                                             reboot_type)
2908
      msg = result.RemoteFailMsg()
2909
      if msg:
2910
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2911
    else:
2912
      result = self.rpc.call_instance_shutdown(node_current, instance)
2913
      msg = result.RemoteFailMsg()
2914
      if msg:
2915
        raise errors.OpExecError("Could not shutdown instance for"
2916
                                 " full reboot: %s" % msg)
2917
      _ShutdownInstanceDisks(self, instance)
2918
      _StartInstanceDisks(self, instance, ignore_secondaries)
2919
      result = self.rpc.call_instance_start(node_current, instance, None, None)
2920
      msg = result.RemoteFailMsg()
2921
      if msg:
2922
        _ShutdownInstanceDisks(self, instance)
2923
        raise errors.OpExecError("Could not start instance for"
2924
                                 " full reboot: %s" % msg)
2925

    
2926
    self.cfg.MarkInstanceUp(instance.name)
2927

    
2928

    
2929
class LUShutdownInstance(LogicalUnit):
2930
  """Shutdown an instance.
2931

2932
  """
2933
  HPATH = "instance-stop"
2934
  HTYPE = constants.HTYPE_INSTANCE
2935
  _OP_REQP = ["instance_name"]
2936
  REQ_BGL = False
2937

    
2938
  def ExpandNames(self):
2939
    self._ExpandAndLockInstance()
2940

    
2941
  def BuildHooksEnv(self):
2942
    """Build hooks env.
2943

2944
    This runs on master, primary and secondary nodes of the instance.
2945

2946
    """
2947
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2948
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2949
    return env, nl, nl
2950

    
2951
  def CheckPrereq(self):
2952
    """Check prerequisites.
2953

2954
    This checks that the instance is in the cluster.
2955

2956
    """
2957
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2958
    assert self.instance is not None, \
2959
      "Cannot retrieve locked instance %s" % self.op.instance_name
2960
    _CheckNodeOnline(self, self.instance.primary_node)
2961

    
2962
  def Exec(self, feedback_fn):
2963
    """Shutdown the instance.
2964

2965
    """
2966
    instance = self.instance
2967
    node_current = instance.primary_node
2968
    self.cfg.MarkInstanceDown(instance.name)
2969
    result = self.rpc.call_instance_shutdown(node_current, instance)
2970
    msg = result.RemoteFailMsg()
2971
    if msg:
2972
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
2973

    
2974
    _ShutdownInstanceDisks(self, instance)
2975

    
2976

    
2977
class LUReinstallInstance(LogicalUnit):
2978
  """Reinstall an instance.
2979

2980
  """
2981
  HPATH = "instance-reinstall"
2982
  HTYPE = constants.HTYPE_INSTANCE
2983
  _OP_REQP = ["instance_name"]
2984
  REQ_BGL = False
2985

    
2986
  def ExpandNames(self):
2987
    self._ExpandAndLockInstance()
2988

    
2989
  def BuildHooksEnv(self):
2990
    """Build hooks env.
2991

2992
    This runs on master, primary and secondary nodes of the instance.
2993

2994
    """
2995
    env = _BuildInstanceHookEnvByObject(self, self.instance)
2996
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2997
    return env, nl, nl
2998

    
2999
  def CheckPrereq(self):
3000
    """Check prerequisites.
3001

3002
    This checks that the instance is in the cluster and is not running.
3003

3004
    """
3005
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3006
    assert instance is not None, \
3007
      "Cannot retrieve locked instance %s" % self.op.instance_name
3008
    _CheckNodeOnline(self, instance.primary_node)
3009

    
3010
    if instance.disk_template == constants.DT_DISKLESS:
3011
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3012
                                 self.op.instance_name)
3013
    if instance.admin_up:
3014
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3015
                                 self.op.instance_name)
3016
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3017
                                              instance.name,
3018
                                              instance.hypervisor)
3019
    remote_info.Raise()
3020
    if remote_info.data:
3021
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3022
                                 (self.op.instance_name,
3023
                                  instance.primary_node))
3024

    
3025
    self.op.os_type = getattr(self.op, "os_type", None)
3026
    if self.op.os_type is not None:
3027
      # OS verification
3028
      pnode = self.cfg.GetNodeInfo(
3029
        self.cfg.ExpandNodeName(instance.primary_node))
3030
      if pnode is None:
3031
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3032
                                   self.op.pnode)
3033
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3034
      result.Raise()
3035
      if not isinstance(result.data, objects.OS):
3036
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3037
                                   " primary node"  % self.op.os_type)
3038

    
3039
    self.instance = instance
3040

    
3041
  def Exec(self, feedback_fn):
3042
    """Reinstall the instance.
3043

3044
    """
3045
    inst = self.instance
3046

    
3047
    if self.op.os_type is not None:
3048
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3049
      inst.os = self.op.os_type
3050
      self.cfg.Update(inst)
3051

    
3052
    _StartInstanceDisks(self, inst, None)
3053
    try:
3054
      feedback_fn("Running the instance OS create scripts...")
3055
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3056
      msg = result.RemoteFailMsg()
3057
      if msg:
3058
        raise errors.OpExecError("Could not install OS for instance %s"
3059
                                 " on node %s: %s" %
3060
                                 (inst.name, inst.primary_node, msg))
3061
    finally:
3062
      _ShutdownInstanceDisks(self, inst)
3063

    
3064

    
3065
class LURenameInstance(LogicalUnit):
3066
  """Rename an instance.
3067

3068
  """
3069
  HPATH = "instance-rename"
3070
  HTYPE = constants.HTYPE_INSTANCE
3071
  _OP_REQP = ["instance_name", "new_name"]
3072

    
3073
  def BuildHooksEnv(self):
3074
    """Build hooks env.
3075

3076
    This runs on master, primary and secondary nodes of the instance.
3077

3078
    """
3079
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3080
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3081
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3082
    return env, nl, nl
3083

    
3084
  def CheckPrereq(self):
3085
    """Check prerequisites.
3086

3087
    This checks that the instance is in the cluster and is not running.
3088

3089
    """
3090
    instance = self.cfg.GetInstanceInfo(
3091
      self.cfg.ExpandInstanceName(self.op.instance_name))
3092
    if instance is None:
3093
      raise errors.OpPrereqError("Instance '%s' not known" %
3094
                                 self.op.instance_name)
3095
    _CheckNodeOnline(self, instance.primary_node)
3096

    
3097
    if instance.admin_up:
3098
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3099
                                 self.op.instance_name)
3100
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3101
                                              instance.name,
3102
                                              instance.hypervisor)
3103
    remote_info.Raise()
3104
    if remote_info.data:
3105
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3106
                                 (self.op.instance_name,
3107
                                  instance.primary_node))
3108
    self.instance = instance
3109

    
3110
    # new name verification
3111
    name_info = utils.HostInfo(self.op.new_name)
3112

    
3113
    self.op.new_name = new_name = name_info.name
3114
    instance_list = self.cfg.GetInstanceList()
3115
    if new_name in instance_list:
3116
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3117
                                 new_name)
3118

    
3119
    if not getattr(self.op, "ignore_ip", False):
3120
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3121
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3122
                                   (name_info.ip, new_name))
3123

    
3124

    
3125
  def Exec(self, feedback_fn):
3126
    """Reinstall the instance.
3127

3128
    """
3129
    inst = self.instance
3130
    old_name = inst.name
3131

    
3132
    if inst.disk_template == constants.DT_FILE:
3133
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3134

    
3135
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3136
    # Change the instance lock. This is definitely safe while we hold the BGL
3137
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3138
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3139

    
3140
    # re-read the instance from the configuration after rename
3141
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3142

    
3143
    if inst.disk_template == constants.DT_FILE:
3144
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3145
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3146
                                                     old_file_storage_dir,
3147
                                                     new_file_storage_dir)
3148
      result.Raise()
3149
      if not result.data:
3150
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3151
                                 " directory '%s' to '%s' (but the instance"
3152
                                 " has been renamed in Ganeti)" % (
3153
                                 inst.primary_node, old_file_storage_dir,
3154
                                 new_file_storage_dir))
3155

    
3156
      if not result.data[0]:
3157
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3158
                                 " (but the instance has been renamed in"
3159
                                 " Ganeti)" % (old_file_storage_dir,
3160
                                               new_file_storage_dir))
3161

    
3162
    _StartInstanceDisks(self, inst, None)
3163
    try:
3164
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3165
                                                 old_name)
3166
      msg = result.RemoteFailMsg()
3167
      if msg:
3168
        msg = ("Could not run OS rename script for instance %s on node %s"
3169
               " (but the instance has been renamed in Ganeti): %s" %
3170
               (inst.name, inst.primary_node, msg))
3171
        self.proc.LogWarning(msg)
3172
    finally:
3173
      _ShutdownInstanceDisks(self, inst)
3174

    
3175

    
3176
class LURemoveInstance(LogicalUnit):
3177
  """Remove an instance.
3178

3179
  """
3180
  HPATH = "instance-remove"
3181
  HTYPE = constants.HTYPE_INSTANCE
3182
  _OP_REQP = ["instance_name", "ignore_failures"]
3183
  REQ_BGL = False
3184

    
3185
  def ExpandNames(self):
3186
    self._ExpandAndLockInstance()
3187
    self.needed_locks[locking.LEVEL_NODE] = []
3188
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3189

    
3190
  def DeclareLocks(self, level):
3191
    if level == locking.LEVEL_NODE:
3192
      self._LockInstancesNodes()
3193

    
3194
  def BuildHooksEnv(self):
3195
    """Build hooks env.
3196

3197
    This runs on master, primary and secondary nodes of the instance.
3198

3199
    """
3200
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3201
    nl = [self.cfg.GetMasterNode()]
3202
    return env, nl, nl
3203

    
3204
  def CheckPrereq(self):
3205
    """Check prerequisites.
3206

3207
    This checks that the instance is in the cluster.
3208

3209
    """
3210
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3211
    assert self.instance is not None, \
3212
      "Cannot retrieve locked instance %s" % self.op.instance_name
3213

    
3214
  def Exec(self, feedback_fn):
3215
    """Remove the instance.
3216

3217
    """
3218
    instance = self.instance
3219
    logging.info("Shutting down instance %s on node %s",
3220
                 instance.name, instance.primary_node)
3221

    
3222
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3223
    msg = result.RemoteFailMsg()
3224
    if msg:
3225
      if self.op.ignore_failures:
3226
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3227
      else:
3228
        raise errors.OpExecError("Could not shutdown instance %s on"
3229
                                 " node %s: %s" %
3230
                                 (instance.name, instance.primary_node, msg))
3231

    
3232
    logging.info("Removing block devices for instance %s", instance.name)
3233

    
3234
    if not _RemoveDisks(self, instance):
3235
      if self.op.ignore_failures:
3236
        feedback_fn("Warning: can't remove instance's disks")
3237
      else:
3238
        raise errors.OpExecError("Can't remove instance's disks")
3239

    
3240
    logging.info("Removing instance %s out of cluster config", instance.name)
3241

    
3242
    self.cfg.RemoveInstance(instance.name)
3243
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3244

    
3245

    
3246
class LUQueryInstances(NoHooksLU):
3247
  """Logical unit for querying instances.
3248

3249
  """
3250
  _OP_REQP = ["output_fields", "names", "use_locking"]
3251
  REQ_BGL = False
3252
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3253
                                    "admin_state",
3254
                                    "disk_template", "ip", "mac", "bridge",
3255
                                    "sda_size", "sdb_size", "vcpus", "tags",
3256
                                    "network_port", "beparams",
3257
                                    r"(disk)\.(size)/([0-9]+)",
3258
                                    r"(disk)\.(sizes)", "disk_usage",
3259
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3260
                                    r"(nic)\.(macs|ips|bridges)",
3261
                                    r"(disk|nic)\.(count)",
3262
                                    "serial_no", "hypervisor", "hvparams",] +
3263
                                  ["hv/%s" % name
3264
                                   for name in constants.HVS_PARAMETERS] +
3265
                                  ["be/%s" % name
3266
                                   for name in constants.BES_PARAMETERS])
3267
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3268

    
3269

    
3270
  def ExpandNames(self):
3271
    _CheckOutputFields(static=self._FIELDS_STATIC,
3272
                       dynamic=self._FIELDS_DYNAMIC,
3273
                       selected=self.op.output_fields)
3274

    
3275
    self.needed_locks = {}
3276
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3277
    self.share_locks[locking.LEVEL_NODE] = 1
3278

    
3279
    if self.op.names:
3280
      self.wanted = _GetWantedInstances(self, self.op.names)
3281
    else:
3282
      self.wanted = locking.ALL_SET
3283

    
3284
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3285
    self.do_locking = self.do_node_query and self.op.use_locking
3286
    if self.do_locking:
3287
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3288
      self.needed_locks[locking.LEVEL_NODE] = []
3289
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3290

    
3291
  def DeclareLocks(self, level):
3292
    if level == locking.LEVEL_NODE and self.do_locking:
3293
      self._LockInstancesNodes()
3294

    
3295
  def CheckPrereq(self):
3296
    """Check prerequisites.
3297

3298
    """
3299
    pass
3300

    
3301
  def Exec(self, feedback_fn):
3302
    """Computes the list of nodes and their attributes.
3303

3304
    """
3305
    all_info = self.cfg.GetAllInstancesInfo()
3306
    if self.wanted == locking.ALL_SET:
3307
      # caller didn't specify instance names, so ordering is not important
3308
      if self.do_locking:
3309
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3310
      else:
3311
        instance_names = all_info.keys()
3312
      instance_names = utils.NiceSort(instance_names)
3313
    else:
3314
      # caller did specify names, so we must keep the ordering
3315
      if self.do_locking:
3316
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3317
      else:
3318
        tgt_set = all_info.keys()
3319
      missing = set(self.wanted).difference(tgt_set)
3320
      if missing:
3321
        raise errors.OpExecError("Some instances were removed before"
3322
                                 " retrieving their data: %s" % missing)
3323
      instance_names = self.wanted
3324

    
3325
    instance_list = [all_info[iname] for iname in instance_names]
3326

    
3327
    # begin data gathering
3328

    
3329
    nodes = frozenset([inst.primary_node for inst in instance_list])
3330
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3331

    
3332
    bad_nodes = []
3333
    off_nodes = []
3334
    if self.do_node_query:
3335
      live_data = {}
3336
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3337
      for name in nodes:
3338
        result = node_data[name]
3339
        if result.offline:
3340
          # offline nodes will be in both lists
3341
          off_nodes.append(name)
3342
        if result.failed:
3343
          bad_nodes.append(name)
3344
        else:
3345
          if result.data:
3346
            live_data.update(result.data)
3347
            # else no instance is alive
3348
    else:
3349
      live_data = dict([(name, {}) for name in instance_names])
3350

    
3351
    # end data gathering
3352

    
3353
    HVPREFIX = "hv/"
3354
    BEPREFIX = "be/"
3355
    output = []
3356
    for instance in instance_list:
3357
      iout = []
3358
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3359
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3360
      for field in self.op.output_fields:
3361
        st_match = self._FIELDS_STATIC.Matches(field)
3362
        if field == "name":
3363
          val = instance.name
3364
        elif field == "os":
3365
          val = instance.os
3366
        elif field == "pnode":
3367
          val = instance.primary_node
3368
        elif field == "snodes":
3369
          val = list(instance.secondary_nodes)
3370
        elif field == "admin_state":
3371
          val = instance.admin_up
3372
        elif field == "oper_state":
3373
          if instance.primary_node in bad_nodes:
3374
            val = None
3375
          else:
3376
            val = bool(live_data.get(instance.name))
3377
        elif field == "status":
3378
          if instance.primary_node in off_nodes:
3379
            val = "ERROR_nodeoffline"
3380
          elif instance.primary_node in bad_nodes:
3381
            val = "ERROR_nodedown"
3382
          else:
3383
            running = bool(live_data.get(instance.name))
3384
            if running:
3385
              if instance.admin_up:
3386
                val = "running"
3387
              else:
3388
                val = "ERROR_up"
3389
            else:
3390
              if instance.admin_up:
3391
                val = "ERROR_down"
3392
              else:
3393
                val = "ADMIN_down"
3394
        elif field == "oper_ram":
3395
          if instance.primary_node in bad_nodes:
3396
            val = None
3397
          elif instance.name in live_data:
3398
            val = live_data[instance.name].get("memory", "?")
3399
          else:
3400
            val = "-"
3401
        elif field == "disk_template":
3402
          val = instance.disk_template
3403
        elif field == "ip":
3404
          val = instance.nics[0].ip
3405
        elif field == "bridge":
3406
          val = instance.nics[0].bridge
3407
        elif field == "mac":
3408
          val = instance.nics[0].mac
3409
        elif field == "sda_size" or field == "sdb_size":
3410
          idx = ord(field[2]) - ord('a')
3411
          try:
3412
            val = instance.FindDisk(idx).size
3413
          except errors.OpPrereqError:
3414
            val = None
3415
        elif field == "disk_usage": # total disk usage per node
3416
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3417
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3418
        elif field == "tags":
3419
          val = list(instance.GetTags())
3420
        elif field == "serial_no":
3421
          val = instance.serial_no
3422
        elif field == "network_port":
3423
          val = instance.network_port
3424
        elif field == "hypervisor":
3425
          val = instance.hypervisor
3426
        elif field == "hvparams":
3427
          val = i_hv
3428
        elif (field.startswith(HVPREFIX) and
3429
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3430
          val = i_hv.get(field[len(HVPREFIX):], None)
3431
        elif field == "beparams":
3432
          val = i_be
3433
        elif (field.startswith(BEPREFIX) and
3434
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3435
          val = i_be.get(field[len(BEPREFIX):], None)
3436
        elif st_match and st_match.groups():
3437
          # matches a variable list
3438
          st_groups = st_match.groups()
3439
          if st_groups and st_groups[0] == "disk":
3440
            if st_groups[1] == "count":
3441
              val = len(instance.disks)
3442
            elif st_groups[1] == "sizes":
3443
              val = [disk.size for disk in instance.disks]
3444
            elif st_groups[1] == "size":
3445
              try:
3446
                val = instance.FindDisk(st_groups[2]).size
3447
              except errors.OpPrereqError:
3448
                val = None
3449
            else:
3450
              assert False, "Unhandled disk parameter"
3451
          elif st_groups[0] == "nic":
3452
            if st_groups[1] == "count":
3453
              val = len(instance.nics)
3454
            elif st_groups[1] == "macs":
3455
              val = [nic.mac for nic in instance.nics]
3456
            elif st_groups[1] == "ips":
3457
              val = [nic.ip for nic in instance.nics]
3458
            elif st_groups[1] == "bridges":
3459
              val = [nic.bridge for nic in instance.nics]
3460
            else:
3461
              # index-based item
3462
              nic_idx = int(st_groups[2])
3463
              if nic_idx >= len(instance.nics):
3464
                val = None
3465
              else:
3466
                if st_groups[1] == "mac":
3467
                  val = instance.nics[nic_idx].mac
3468
                elif st_groups[1] == "ip":
3469
                  val = instance.nics[nic_idx].ip
3470
                elif st_groups[1] == "bridge":
3471
                  val = instance.nics[nic_idx].bridge
3472
                else:
3473
                  assert False, "Unhandled NIC parameter"
3474
          else:
3475
            assert False, "Unhandled variable parameter"
3476
        else:
3477
          raise errors.ParameterError(field)
3478
        iout.append(val)
3479
      output.append(iout)
3480

    
3481
    return output
3482

    
3483

    
3484
class LUFailoverInstance(LogicalUnit):
3485
  """Failover an instance.
3486

3487
  """
3488
  HPATH = "instance-failover"
3489
  HTYPE = constants.HTYPE_INSTANCE
3490
  _OP_REQP = ["instance_name", "ignore_consistency"]
3491
  REQ_BGL = False
3492

    
3493
  def ExpandNames(self):
3494
    self._ExpandAndLockInstance()
3495
    self.needed_locks[locking.LEVEL_NODE] = []
3496
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3497

    
3498
  def DeclareLocks(self, level):
3499
    if level == locking.LEVEL_NODE:
3500
      self._LockInstancesNodes()
3501

    
3502
  def BuildHooksEnv(self):
3503
    """Build hooks env.
3504

3505
    This runs on master, primary and secondary nodes of the instance.
3506

3507
    """
3508
    env = {
3509
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3510
      }
3511
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3512
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3513
    return env, nl, nl
3514

    
3515
  def CheckPrereq(self):
3516
    """Check prerequisites.
3517

3518
    This checks that the instance is in the cluster.
3519

3520
    """
3521
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3522
    assert self.instance is not None, \
3523
      "Cannot retrieve locked instance %s" % self.op.instance_name
3524

    
3525
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3526
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3527
      raise errors.OpPrereqError("Instance's disk layout is not"
3528
                                 " network mirrored, cannot failover.")
3529

    
3530
    secondary_nodes = instance.secondary_nodes
3531
    if not secondary_nodes:
3532
      raise errors.ProgrammerError("no secondary node but using "
3533
                                   "a mirrored disk template")
3534

    
3535
    target_node = secondary_nodes[0]
3536
    _CheckNodeOnline(self, target_node)
3537
    _CheckNodeNotDrained(self, target_node)
3538
    # check memory requirements on the secondary node
3539
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3540
                         instance.name, bep[constants.BE_MEMORY],
3541
                         instance.hypervisor)
3542

    
3543
    # check bridge existance
3544
    brlist = [nic.bridge for nic in instance.nics]
3545
    result = self.rpc.call_bridges_exist(target_node, brlist)
3546
    result.Raise()
3547
    if not result.data:
3548
      raise errors.OpPrereqError("One or more target bridges %s does not"
3549
                                 " exist on destination node '%s'" %
3550
                                 (brlist, target_node))
3551

    
3552
  def Exec(self, feedback_fn):
3553
    """Failover an instance.
3554

3555
    The failover is done by shutting it down on its present node and
3556
    starting it on the secondary.
3557

3558
    """
3559
    instance = self.instance
3560

    
3561
    source_node = instance.primary_node
3562
    target_node = instance.secondary_nodes[0]
3563

    
3564
    feedback_fn("* checking disk consistency between source and target")
3565
    for dev in instance.disks:
3566
      # for drbd, these are drbd over lvm
3567
      if not _CheckDiskConsistency(self, dev, target_node, False):
3568
        if instance.admin_up and not self.op.ignore_consistency:
3569
          raise errors.OpExecError("Disk %s is degraded on target node,"
3570
                                   " aborting failover." % dev.iv_name)
3571

    
3572
    feedback_fn("* shutting down instance on source node")
3573
    logging.info("Shutting down instance %s on node %s",
3574
                 instance.name, source_node)
3575

    
3576
    result = self.rpc.call_instance_shutdown(source_node, instance)
3577
    msg = result.RemoteFailMsg()
3578
    if msg:
3579
      if self.op.ignore_consistency:
3580
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3581
                             " Proceeding anyway. Please make sure node"
3582
                             " %s is down. Error details: %s",
3583
                             instance.name, source_node, source_node, msg)
3584
      else:
3585
        raise errors.OpExecError("Could not shutdown instance %s on"
3586
                                 " node %s: %s" %
3587
                                 (instance.name, source_node, msg))
3588

    
3589
    feedback_fn("* deactivating the instance's disks on source node")
3590
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3591
      raise errors.OpExecError("Can't shut down the instance's disks.")
3592

    
3593
    instance.primary_node = target_node
3594
    # distribute new instance config to the other nodes
3595
    self.cfg.Update(instance)
3596

    
3597
    # Only start the instance if it's marked as up
3598
    if instance.admin_up:
3599
      feedback_fn("* activating the instance's disks on target node")
3600
      logging.info("Starting instance %s on node %s",
3601
                   instance.name, target_node)
3602

    
3603
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3604
                                               ignore_secondaries=True)
3605
      if not disks_ok:
3606
        _ShutdownInstanceDisks(self, instance)
3607
        raise errors.OpExecError("Can't activate the instance's disks")
3608

    
3609
      feedback_fn("* starting the instance on the target node")
3610
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3611
      msg = result.RemoteFailMsg()
3612
      if msg:
3613
        _ShutdownInstanceDisks(self, instance)
3614
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3615
                                 (instance.name, target_node, msg))
3616

    
3617

    
3618
class LUMigrateInstance(LogicalUnit):
3619
  """Migrate an instance.
3620

3621
  This is migration without shutting down, compared to the failover,
3622
  which is done with shutdown.
3623

3624
  """
3625
  HPATH = "instance-migrate"
3626
  HTYPE = constants.HTYPE_INSTANCE
3627
  _OP_REQP = ["instance_name", "live", "cleanup"]
3628

    
3629
  REQ_BGL = False
3630

    
3631
  def ExpandNames(self):
3632
    self._ExpandAndLockInstance()
3633
    self.needed_locks[locking.LEVEL_NODE] = []
3634
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3635

    
3636
  def DeclareLocks(self, level):
3637
    if level == locking.LEVEL_NODE:
3638
      self._LockInstancesNodes()
3639

    
3640
  def BuildHooksEnv(self):
3641
    """Build hooks env.
3642

3643
    This runs on master, primary and secondary nodes of the instance.
3644

3645
    """
3646
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3647
    env["MIGRATE_LIVE"] = self.op.live
3648
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3649
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3650
    return env, nl, nl
3651

    
3652
  def CheckPrereq(self):
3653
    """Check prerequisites.
3654

3655
    This checks that the instance is in the cluster.
3656

3657
    """
3658
    instance = self.cfg.GetInstanceInfo(
3659
      self.cfg.ExpandInstanceName(self.op.instance_name))
3660
    if instance is None:
3661
      raise errors.OpPrereqError("Instance '%s' not known" %
3662
                                 self.op.instance_name)
3663

    
3664
    if instance.disk_template != constants.DT_DRBD8:
3665
      raise errors.OpPrereqError("Instance's disk layout is not"
3666
                                 " drbd8, cannot migrate.")
3667

    
3668
    secondary_nodes = instance.secondary_nodes
3669
    if not secondary_nodes:
3670
      raise errors.ConfigurationError("No secondary node but using"
3671
                                      " drbd8 disk template")
3672

    
3673
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3674

    
3675
    target_node = secondary_nodes[0]
3676
    # check memory requirements on the secondary node
3677
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3678
                         instance.name, i_be[constants.BE_MEMORY],
3679
                         instance.hypervisor)
3680

    
3681
    # check bridge existance
3682
    brlist = [nic.bridge for nic in instance.nics]
3683
    result = self.rpc.call_bridges_exist(target_node, brlist)
3684
    if result.failed or not result.data:
3685
      raise errors.OpPrereqError("One or more target bridges %s does not"
3686
                                 " exist on destination node '%s'" %
3687
                                 (brlist, target_node))
3688

    
3689
    if not self.op.cleanup:
3690
      _CheckNodeNotDrained(self, target_node)
3691
      result = self.rpc.call_instance_migratable(instance.primary_node,
3692
                                                 instance)
3693
      msg = result.RemoteFailMsg()
3694
      if msg:
3695
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3696
                                   msg)
3697

    
3698
    self.instance = instance
3699

    
3700
  def _WaitUntilSync(self):
3701
    """Poll with custom rpc for disk sync.
3702

3703
    This uses our own step-based rpc call.
3704

3705
    """
3706
    self.feedback_fn("* wait until resync is done")
3707
    all_done = False
3708
    while not all_done:
3709
      all_done = True
3710
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3711
                                            self.nodes_ip,
3712
                                            self.instance.disks)
3713
      min_percent = 100
3714
      for node, nres in result.items():
3715
        msg = nres.RemoteFailMsg()
3716
        if msg:
3717
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3718
                                   (node, msg))
3719
        node_done, node_percent = nres.payload
3720
        all_done = all_done and node_done
3721
        if node_percent is not None:
3722
          min_percent = min(min_percent, node_percent)
3723
      if not all_done:
3724
        if min_percent < 100:
3725
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3726
        time.sleep(2)
3727

    
3728
  def _EnsureSecondary(self, node):
3729
    """Demote a node to secondary.
3730

3731
    """
3732
    self.feedback_fn("* switching node %s to secondary mode" % node)
3733

    
3734
    for dev in self.instance.disks:
3735
      self.cfg.SetDiskID(dev, node)
3736

    
3737
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3738
                                          self.instance.disks)
3739
    msg = result.RemoteFailMsg()
3740
    if msg:
3741
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3742
                               " error %s" % (node, msg))
3743

    
3744
  def _GoStandalone(self):
3745
    """Disconnect from the network.
3746

3747
    """
3748
    self.feedback_fn("* changing into standalone mode")
3749
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3750
                                               self.instance.disks)
3751
    for node, nres in result.items():
3752
      msg = nres.RemoteFailMsg()
3753
      if msg:
3754
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3755
                                 " error %s" % (node, msg))
3756

    
3757
  def _GoReconnect(self, multimaster):
3758
    """Reconnect to the network.
3759

3760
    """
3761
    if multimaster:
3762
      msg = "dual-master"
3763
    else:
3764
      msg = "single-master"
3765
    self.feedback_fn("* changing disks into %s mode" % msg)
3766
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3767
                                           self.instance.disks,
3768
                                           self.instance.name, multimaster)
3769
    for node, nres in result.items():
3770
      msg = nres.RemoteFailMsg()
3771
      if msg:
3772
        raise errors.OpExecError("Cannot change disks config on node %s,"
3773
                                 " error: %s" % (node, msg))
3774

    
3775
  def _ExecCleanup(self):
3776
    """Try to cleanup after a failed migration.
3777

3778
    The cleanup is done by:
3779
      - check that the instance is running only on one node
3780
        (and update the config if needed)
3781
      - change disks on its secondary node to secondary
3782
      - wait until disks are fully synchronized
3783
      - disconnect from the network
3784
      - change disks into single-master mode
3785
      - wait again until disks are fully synchronized
3786

3787
    """
3788
    instance = self.instance
3789
    target_node = self.target_node
3790
    source_node = self.source_node
3791

    
3792
    # check running on only one node
3793
    self.feedback_fn("* checking where the instance actually runs"
3794
                     " (if this hangs, the hypervisor might be in"
3795
                     " a bad state)")
3796
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3797
    for node, result in ins_l.items():
3798
      result.Raise()
3799
      if not isinstance(result.data, list):
3800
        raise errors.OpExecError("Can't contact node '%s'" % node)
3801

    
3802
    runningon_source = instance.name in ins_l[source_node].data
3803
    runningon_target = instance.name in ins_l[target_node].data
3804

    
3805
    if runningon_source and runningon_target:
3806
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3807
                               " or the hypervisor is confused. You will have"
3808
                               " to ensure manually that it runs only on one"
3809
                               " and restart this operation.")
3810

    
3811
    if not (runningon_source or runningon_target):
3812
      raise errors.OpExecError("Instance does not seem to be running at all."
3813
                               " In this case, it's safer to repair by"
3814
                               " running 'gnt-instance stop' to ensure disk"
3815
                               " shutdown, and then restarting it.")
3816

    
3817
    if runningon_target:
3818
      # the migration has actually succeeded, we need to update the config
3819
      self.feedback_fn("* instance running on secondary node (%s),"
3820
                       " updating config" % target_node)
3821
      instance.primary_node = target_node
3822
      self.cfg.Update(instance)
3823
      demoted_node = source_node
3824
    else:
3825
      self.feedback_fn("* instance confirmed to be running on its"
3826
                       " primary node (%s)" % source_node)
3827
      demoted_node = target_node
3828

    
3829
    self._EnsureSecondary(demoted_node)
3830
    try:
3831
      self._WaitUntilSync()
3832
    except errors.OpExecError:
3833
      # we ignore here errors, since if the device is standalone, it
3834
      # won't be able to sync
3835
      pass
3836
    self._GoStandalone()
3837
    self._GoReconnect(False)
3838
    self._WaitUntilSync()
3839

    
3840
    self.feedback_fn("* done")
3841

    
3842
  def _RevertDiskStatus(self):
3843
    """Try to revert the disk status after a failed migration.
3844

3845
    """
3846
    target_node = self.target_node
3847
    try:
3848
      self._EnsureSecondary(target_node)
3849
      self._GoStandalone()
3850
      self._GoReconnect(False)
3851
      self._WaitUntilSync()
3852
    except errors.OpExecError, err:
3853
      self.LogWarning("Migration failed and I can't reconnect the"
3854
                      " drives: error '%s'\n"
3855
                      "Please look and recover the instance status" %
3856
                      str(err))
3857

    
3858
  def _AbortMigration(self):
3859
    """Call the hypervisor code to abort a started migration.
3860

3861
    """
3862
    instance = self.instance
3863
    target_node = self.target_node
3864
    migration_info = self.migration_info
3865

    
3866
    abort_result = self.rpc.call_finalize_migration(target_node,
3867
                                                    instance,
3868
                                                    migration_info,
3869
                                                    False)
3870
    abort_msg = abort_result.RemoteFailMsg()
3871
    if abort_msg:
3872
      logging.error("Aborting migration failed on target node %s: %s" %
3873
                    (target_node, abort_msg))
3874
      # Don't raise an exception here, as we stil have to try to revert the
3875
      # disk status, even if this step failed.
3876

    
3877
  def _ExecMigration(self):
3878
    """Migrate an instance.
3879

3880
    The migrate is done by:
3881
      - change the disks into dual-master mode
3882
      - wait until disks are fully synchronized again
3883
      - migrate the instance
3884
      - change disks on the new secondary node (the old primary) to secondary
3885
      - wait until disks are fully synchronized
3886
      - change disks into single-master mode
3887

3888
    """
3889
    instance = self.instance
3890
    target_node = self.target_node
3891
    source_node = self.source_node
3892

    
3893
    self.feedback_fn("* checking disk consistency between source and target")
3894
    for dev in instance.disks:
3895
      if not _CheckDiskConsistency(self, dev, target_node, False):
3896
        raise errors.OpExecError("Disk %s is degraded or not fully"
3897
                                 " synchronized on target node,"
3898
                                 " aborting migrate." % dev.iv_name)
3899

    
3900
    # First get the migration information from the remote node
3901
    result = self.rpc.call_migration_info(source_node, instance)
3902
    msg = result.RemoteFailMsg()
3903
    if msg:
3904
      log_err = ("Failed fetching source migration information from %s: %s" %
3905
                 (source_node, msg))
3906
      logging.error(log_err)
3907
      raise errors.OpExecError(log_err)
3908

    
3909
    self.migration_info = migration_info = result.payload
3910

    
3911
    # Then switch the disks to master/master mode
3912
    self._EnsureSecondary(target_node)
3913
    self._GoStandalone()
3914
    self._GoReconnect(True)
3915
    self._WaitUntilSync()
3916

    
3917
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3918
    result = self.rpc.call_accept_instance(target_node,
3919
                                           instance,
3920
                                           migration_info,
3921
                                           self.nodes_ip[target_node])
3922

    
3923
    msg = result.RemoteFailMsg()
3924
    if msg:
3925
      logging.error("Instance pre-migration failed, trying to revert"
3926
                    " disk status: %s", msg)
3927
      self._AbortMigration()
3928
      self._RevertDiskStatus()
3929
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3930
                               (instance.name, msg))
3931

    
3932
    self.feedback_fn("* migrating instance to %s" % target_node)
3933
    time.sleep(10)
3934
    result = self.rpc.call_instance_migrate(source_node, instance,
3935
                                            self.nodes_ip[target_node],
3936
                                            self.op.live)
3937
    msg = result.RemoteFailMsg()
3938
    if msg:
3939
      logging.error("Instance migration failed, trying to revert"
3940
                    " disk status: %s", msg)
3941
      self._AbortMigration()
3942
      self._RevertDiskStatus()
3943
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3944
                               (instance.name, msg))
3945
    time.sleep(10)
3946

    
3947
    instance.primary_node = target_node
3948
    # distribute new instance config to the other nodes
3949
    self.cfg.Update(instance)
3950

    
3951
    result = self.rpc.call_finalize_migration(target_node,
3952
                                              instance,
3953
                                              migration_info,
3954
                                              True)
3955
    msg = result.RemoteFailMsg()
3956
    if msg:
3957
      logging.error("Instance migration succeeded, but finalization failed:"
3958
                    " %s" % msg)
3959
      raise errors.OpExecError("Could not finalize instance migration: %s" %
3960
                               msg)
3961

    
3962
    self._EnsureSecondary(source_node)
3963
    self._WaitUntilSync()
3964
    self._GoStandalone()
3965
    self._GoReconnect(False)
3966
    self._WaitUntilSync()
3967

    
3968
    self.feedback_fn("* done")
3969

    
3970
  def Exec(self, feedback_fn):
3971
    """Perform the migration.
3972

3973
    """
3974
    self.feedback_fn = feedback_fn
3975

    
3976
    self.source_node = self.instance.primary_node
3977
    self.target_node = self.instance.secondary_nodes[0]
3978
    self.all_nodes = [self.source_node, self.target_node]
3979
    self.nodes_ip = {
3980
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
3981
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
3982
      }
3983
    if self.op.cleanup:
3984
      return self._ExecCleanup()
3985
    else:
3986
      return self._ExecMigration()
3987

    
3988

    
3989
def _CreateBlockDev(lu, node, instance, device, force_create,
3990
                    info, force_open):
3991
  """Create a tree of block devices on a given node.
3992

3993
  If this device type has to be created on secondaries, create it and
3994
  all its children.
3995

3996
  If not, just recurse to children keeping the same 'force' value.
3997

3998
  @param lu: the lu on whose behalf we execute
3999
  @param node: the node on which to create the device
4000
  @type instance: L{objects.Instance}
4001
  @param instance: the instance which owns the device
4002
  @type device: L{objects.Disk}
4003
  @param device: the device to create
4004
  @type force_create: boolean
4005
  @param force_create: whether to force creation of this device; this
4006
      will be change to True whenever we find a device which has
4007
      CreateOnSecondary() attribute
4008
  @param info: the extra 'metadata' we should attach to the device
4009
      (this will be represented as a LVM tag)
4010
  @type force_open: boolean
4011
  @param force_open: this parameter will be passes to the
4012
      L{backend.BlockdevCreate} function where it specifies
4013
      whether we run on primary or not, and it affects both
4014
      the child assembly and the device own Open() execution
4015

4016
  """
4017
  if device.CreateOnSecondary():
4018
    force_create = True
4019

    
4020
  if device.children:
4021
    for child in device.children:
4022
      _CreateBlockDev(lu, node, instance, child, force_create,
4023
                      info, force_open)
4024

    
4025
  if not force_create:
4026
    return
4027

    
4028
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4029

    
4030

    
4031
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4032
  """Create a single block device on a given node.
4033

4034
  This will not recurse over children of the device, so they must be
4035
  created in advance.
4036

4037
  @param lu: the lu on whose behalf we execute
4038
  @param node: the node on which to create the device
4039
  @type instance: L{objects.Instance}
4040
  @param instance: the instance which owns the device
4041
  @type device: L{objects.Disk}
4042
  @param device: the device to create
4043
  @param info: the extra 'metadata' we should attach to the device
4044
      (this will be represented as a LVM tag)
4045
  @type force_open: boolean
4046
  @param force_open: this parameter will be passes to the
4047
      L{backend.BlockdevCreate} function where it specifies
4048
      whether we run on primary or not, and it affects both
4049
      the child assembly and the device own Open() execution
4050

4051
  """
4052
  lu.cfg.SetDiskID(device, node)
4053
  result = lu.rpc.call_blockdev_create(node, device, device.size,
4054
                                       instance.name, force_open, info)
4055
  msg = result.RemoteFailMsg()
4056
  if msg:
4057
    raise errors.OpExecError("Can't create block device %s on"
4058
                             " node %s for instance %s: %s" %
4059
                             (device, node, instance.name, msg))
4060
  if device.physical_id is None:
4061
    device.physical_id = result.payload
4062

    
4063

    
4064
def _GenerateUniqueNames(lu, exts):
4065
  """Generate a suitable LV name.
4066

4067
  This will generate a logical volume name for the given instance.
4068

4069
  """
4070
  results = []
4071
  for val in exts:
4072
    new_id = lu.cfg.GenerateUniqueID()
4073
    results.append("%s%s" % (new_id, val))
4074
  return results
4075

    
4076

    
4077
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4078
                         p_minor, s_minor):
4079
  """Generate a drbd8 device complete with its children.
4080

4081
  """
4082
  port = lu.cfg.AllocatePort()
4083
  vgname = lu.cfg.GetVGName()
4084
  shared_secret = lu.cfg.GenerateDRBDSecret()
4085
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4086
                          logical_id=(vgname, names[0]))
4087
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4088
                          logical_id=(vgname, names[1]))
4089
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4090
                          logical_id=(primary, secondary, port,
4091
                                      p_minor, s_minor,
4092
                                      shared_secret),
4093
                          children=[dev_data, dev_meta],
4094
                          iv_name=iv_name)
4095
  return drbd_dev
4096

    
4097

    
4098
def _GenerateDiskTemplate(lu, template_name,
4099
                          instance_name, primary_node,
4100
                          secondary_nodes, disk_info,
4101
                          file_storage_dir, file_driver,
4102
                          base_index):
4103
  """Generate the entire disk layout for a given template type.
4104

4105
  """
4106
  #TODO: compute space requirements
4107

    
4108
  vgname = lu.cfg.GetVGName()
4109
  disk_count = len(disk_info)
4110
  disks = []
4111
  if template_name == constants.DT_DISKLESS:
4112
    pass
4113
  elif template_name == constants.DT_PLAIN:
4114
    if len(secondary_nodes) != 0:
4115
      raise errors.ProgrammerError("Wrong template configuration")
4116

    
4117
    names = _GenerateUniqueNames(lu, [".disk%d" % i
4118
                                      for i in range(disk_count)])
4119
    for idx, disk in enumerate(disk_info):
4120
      disk_index = idx + base_index
4121
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4122
                              logical_id=(vgname, names[idx]),
4123
                              iv_name="disk/%d" % disk_index,
4124
                              mode=disk["mode"])
4125
      disks.append(disk_dev)
4126
  elif template_name == constants.DT_DRBD8:
4127
    if len(secondary_nodes) != 1:
4128
      raise errors.ProgrammerError("Wrong template configuration")
4129
    remote_node = secondary_nodes[0]
4130
    minors = lu.cfg.AllocateDRBDMinor(
4131
      [primary_node, remote_node] * len(disk_info), instance_name)
4132

    
4133
    names = []
4134
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4135
                                               for i in range(disk_count)]):
4136
      names.append(lv_prefix + "_data")
4137
      names.append(lv_prefix + "_meta")
4138
    for idx, disk in enumerate(disk_info):
4139
      disk_index = idx + base_index
4140
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4141
                                      disk["size"], names[idx*2:idx*2+2],
4142
                                      "disk/%d" % disk_index,
4143
                                      minors[idx*2], minors[idx*2+1])
4144
      disk_dev.mode = disk["mode"]
4145
      disks.append(disk_dev)
4146
  elif template_name == constants.DT_FILE:
4147
    if len(secondary_nodes) != 0:
4148
      raise errors.ProgrammerError("Wrong template configuration")
4149

    
4150
    for idx, disk in enumerate(disk_info):
4151
      disk_index = idx + base_index
4152
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4153
                              iv_name="disk/%d" % disk_index,
4154
                              logical_id=(file_driver,
4155
                                          "%s/disk%d" % (file_storage_dir,
4156
                                                         disk_index)),
4157
                              mode=disk["mode"])
4158
      disks.append(disk_dev)
4159
  else:
4160
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4161
  return disks
4162

    
4163

    
4164
def _GetInstanceInfoText(instance):
4165
  """Compute that text that should be added to the disk's metadata.
4166

4167
  """
4168
  return "originstname+%s" % instance.name
4169

    
4170

    
4171
def _CreateDisks(lu, instance):
4172
  """Create all disks for an instance.
4173

4174
  This abstracts away some work from AddInstance.
4175

4176
  @type lu: L{LogicalUnit}
4177
  @param lu: the logical unit on whose behalf we execute
4178
  @type instance: L{objects.Instance}
4179
  @param instance: the instance whose disks we should create
4180
  @rtype: boolean
4181
  @return: the success of the creation
4182

4183
  """
4184
  info = _GetInstanceInfoText(instance)
4185
  pnode = instance.primary_node
4186

    
4187
  if instance.disk_template == constants.DT_FILE:
4188
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4189
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4190

    
4191
    if result.failed or not result.data:
4192
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4193

    
4194
    if not result.data[0]:
4195
      raise errors.OpExecError("Failed to create directory '%s'" %
4196
                               file_storage_dir)
4197

    
4198
  # Note: this needs to be kept in sync with adding of disks in
4199
  # LUSetInstanceParams
4200
  for device in instance.disks:
4201
    logging.info("Creating volume %s for instance %s",
4202
                 device.iv_name, instance.name)
4203
    #HARDCODE
4204
    for node in instance.all_nodes:
4205
      f_create = node == pnode
4206
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4207

    
4208

    
4209
def _RemoveDisks(lu, instance):
4210
  """Remove all disks for an instance.
4211

4212
  This abstracts away some work from `AddInstance()` and
4213
  `RemoveInstance()`. Note that in case some of the devices couldn't
4214
  be removed, the removal will continue with the other ones (compare
4215
  with `_CreateDisks()`).
4216

4217
  @type lu: L{LogicalUnit}
4218
  @param lu: the logical unit on whose behalf we execute
4219
  @type instance: L{objects.Instance}
4220
  @param instance: the instance whose disks we should remove
4221
  @rtype: boolean
4222
  @return: the success of the removal
4223

4224
  """
4225
  logging.info("Removing block devices for instance %s", instance.name)
4226

    
4227
  all_result = True
4228
  for device in instance.disks:
4229
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4230
      lu.cfg.SetDiskID(disk, node)
4231
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4232
      if msg:
4233
        lu.LogWarning("Could not remove block device %s on node %s,"
4234
                      " continuing anyway: %s", device.iv_name, node, msg)
4235
        all_result = False
4236

    
4237
  if instance.disk_template == constants.DT_FILE:
4238
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4239
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4240
                                                 file_storage_dir)
4241
    if result.failed or not result.data:
4242
      logging.error("Could not remove directory '%s'", file_storage_dir)
4243
      all_result = False
4244

    
4245
  return all_result
4246

    
4247

    
4248
def _ComputeDiskSize(disk_template, disks):
4249
  """Compute disk size requirements in the volume group
4250

4251
  """
4252
  # Required free disk space as a function of disk and swap space
4253
  req_size_dict = {
4254
    constants.DT_DISKLESS: None,
4255
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4256
    # 128 MB are added for drbd metadata for each disk
4257
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4258
    constants.DT_FILE: None,
4259
  }
4260

    
4261
  if disk_template not in req_size_dict:
4262
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4263
                                 " is unknown" %  disk_template)
4264

    
4265
  return req_size_dict[disk_template]
4266

    
4267

    
4268
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4269
  """Hypervisor parameter validation.
4270

4271
  This function abstract the hypervisor parameter validation to be
4272
  used in both instance create and instance modify.
4273

4274
  @type lu: L{LogicalUnit}
4275
  @param lu: the logical unit for which we check
4276
  @type nodenames: list
4277
  @param nodenames: the list of nodes on which we should check
4278
  @type hvname: string
4279
  @param hvname: the name of the hypervisor we should use
4280
  @type hvparams: dict
4281
  @param hvparams: the parameters which we need to check
4282
  @raise errors.OpPrereqError: if the parameters are not valid
4283

4284
  """
4285
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4286
                                                  hvname,
4287
                                                  hvparams)
4288
  for node in nodenames:
4289
    info = hvinfo[node]
4290
    if info.offline:
4291
      continue
4292
    msg = info.RemoteFailMsg()
4293
    if msg:
4294
      raise errors.OpPrereqError("Hypervisor parameter validation"
4295
                                 " failed on node %s: %s" % (node, msg))
4296

    
4297

    
4298
class LUCreateInstance(LogicalUnit):
4299
  """Create an instance.
4300

4301
  """
4302
  HPATH = "instance-add"
4303
  HTYPE = constants.HTYPE_INSTANCE
4304
  _OP_REQP = ["instance_name", "disks", "disk_template",
4305
              "mode", "start",
4306
              "wait_for_sync", "ip_check", "nics",
4307
              "hvparams", "beparams"]
4308
  REQ_BGL = False
4309

    
4310
  def _ExpandNode(self, node):
4311
    """Expands and checks one node name.
4312

4313
    """
4314
    node_full = self.cfg.ExpandNodeName(node)
4315
    if node_full is None:
4316
      raise errors.OpPrereqError("Unknown node %s" % node)
4317
    return node_full
4318

    
4319
  def ExpandNames(self):
4320
    """ExpandNames for CreateInstance.
4321

4322
    Figure out the right locks for instance creation.
4323

4324
    """
4325
    self.needed_locks = {}
4326

    
4327
    # set optional parameters to none if they don't exist
4328
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4329
      if not hasattr(self.op, attr):
4330
        setattr(self.op, attr, None)
4331

    
4332
    # cheap checks, mostly valid constants given
4333

    
4334
    # verify creation mode
4335
    if self.op.mode not in (constants.INSTANCE_CREATE,
4336
                            constants.INSTANCE_IMPORT):
4337
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4338
                                 self.op.mode)
4339

    
4340
    # disk template and mirror node verification
4341
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4342
      raise errors.OpPrereqError("Invalid disk template name")
4343

    
4344
    if self.op.hypervisor is None:
4345
      self.op.hypervisor = self.cfg.GetHypervisorType()
4346

    
4347
    cluster = self.cfg.GetClusterInfo()
4348
    enabled_hvs = cluster.enabled_hypervisors
4349
    if self.op.hypervisor not in enabled_hvs:
4350
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4351
                                 " cluster (%s)" % (self.op.hypervisor,
4352
                                  ",".join(enabled_hvs)))
4353

    
4354
    # check hypervisor parameter syntax (locally)
4355
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4356
    filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4357
                                  self.op.hvparams)
4358
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4359
    hv_type.CheckParameterSyntax(filled_hvp)
4360

    
4361
    # fill and remember the beparams dict
4362
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4363
    self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4364
                                    self.op.beparams)
4365

    
4366
    #### instance parameters check
4367

    
4368
    # instance name verification
4369
    hostname1 = utils.HostInfo(self.op.instance_name)
4370
    self.op.instance_name = instance_name = hostname1.name
4371

    
4372
    # this is just a preventive check, but someone might still add this
4373
    # instance in the meantime, and creation will fail at lock-add time
4374
    if instance_name in self.cfg.GetInstanceList():
4375
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4376
                                 instance_name)
4377

    
4378
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4379

    
4380
    # NIC buildup
4381
    self.nics = []
4382
    for nic in self.op.nics:
4383
      # ip validity checks
4384
      ip = nic.get("ip", None)
4385
      if ip is None or ip.lower() == "none":
4386
        nic_ip = None
4387
      elif ip.lower() == constants.VALUE_AUTO:
4388
        nic_ip = hostname1.ip
4389
      else:
4390
        if not utils.IsValidIP(ip):
4391
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4392
                                     " like a valid IP" % ip)
4393
        nic_ip = ip
4394

    
4395
      # MAC address verification
4396
      mac = nic.get("mac", constants.VALUE_AUTO)
4397
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4398
        if not utils.IsValidMac(mac.lower()):
4399
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4400
                                     mac)
4401
      # bridge verification
4402
      bridge = nic.get("bridge", None)
4403
      if bridge is None:
4404
        bridge = self.cfg.GetDefBridge()
4405
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4406

    
4407
    # disk checks/pre-build
4408
    self.disks = []
4409
    for disk in self.op.disks:
4410
      mode = disk.get("mode", constants.DISK_RDWR)
4411
      if mode not in constants.DISK_ACCESS_SET:
4412
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4413
                                   mode)
4414
      size = disk.get("size", None)
4415
      if size is None:
4416
        raise errors.OpPrereqError("Missing disk size")
4417
      try:
4418
        size = int(size)
4419
      except ValueError:
4420
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4421
      self.disks.append({"size": size, "mode": mode})
4422

    
4423
    # used in CheckPrereq for ip ping check
4424
    self.check_ip = hostname1.ip
4425

    
4426
    # file storage checks
4427
    if (self.op.file_driver and
4428
        not self.op.file_driver in constants.FILE_DRIVER):
4429
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4430
                                 self.op.file_driver)
4431

    
4432
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4433
      raise errors.OpPrereqError("File storage directory path not absolute")
4434

    
4435
    ### Node/iallocator related checks
4436
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4437
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4438
                                 " node must be given")
4439

    
4440
    if self.op.iallocator:
4441
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4442
    else:
4443
      self.op.pnode = self._ExpandNode(self.op.pnode)
4444
      nodelist = [self.op.pnode]
4445
      if self.op.snode is not None:
4446
        self.op.snode = self._ExpandNode(self.op.snode)
4447
        nodelist.append(self.op.snode)
4448
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4449

    
4450
    # in case of import lock the source node too
4451
    if self.op.mode == constants.INSTANCE_IMPORT:
4452
      src_node = getattr(self.op, "src_node", None)
4453
      src_path = getattr(self.op, "src_path", None)
4454

    
4455
      if src_path is None:
4456
        self.op.src_path = src_path = self.op.instance_name
4457

    
4458
      if src_node is None:
4459
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4460
        self.op.src_node = None
4461
        if os.path.isabs(src_path):
4462
          raise errors.OpPrereqError("Importing an instance from an absolute"
4463
                                     " path requires a source node option.")
4464
      else:
4465
        self.op.src_node = src_node = self._ExpandNode(src_node)
4466
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4467
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4468
        if not os.path.isabs(src_path):
4469
          self.op.src_path = src_path = \
4470
            os.path.join(constants.EXPORT_DIR, src_path)
4471

    
4472
    else: # INSTANCE_CREATE
4473
      if getattr(self.op, "os_type", None) is None:
4474
        raise errors.OpPrereqError("No guest OS specified")
4475

    
4476
  def _RunAllocator(self):
4477
    """Run the allocator based on input opcode.
4478

4479
    """
4480
    nics = [n.ToDict() for n in self.nics]
4481
    ial = IAllocator(self,
4482
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4483
                     name=self.op.instance_name,
4484
                     disk_template=self.op.disk_template,
4485
                     tags=[],
4486
                     os=self.op.os_type,
4487
                     vcpus=self.be_full[constants.BE_VCPUS],
4488
                     mem_size=self.be_full[constants.BE_MEMORY],
4489
                     disks=self.disks,
4490
                     nics=nics,
4491
                     hypervisor=self.op.hypervisor,
4492
                     )
4493

    
4494
    ial.Run(self.op.iallocator)
4495

    
4496
    if not ial.success:
4497
      raise errors.OpPrereqError("Can't compute nodes using"
4498
                                 " iallocator '%s': %s" % (self.op.iallocator,
4499
                                                           ial.info))
4500
    if len(ial.nodes) != ial.required_nodes:
4501
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4502
                                 " of nodes (%s), required %s" %
4503
                                 (self.op.iallocator, len(ial.nodes),
4504
                                  ial.required_nodes))
4505
    self.op.pnode = ial.nodes[0]
4506
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4507
                 self.op.instance_name, self.op.iallocator,
4508
                 ", ".join(ial.nodes))
4509
    if ial.required_nodes == 2:
4510
      self.op.snode = ial.nodes[1]
4511

    
4512
  def BuildHooksEnv(self):
4513
    """Build hooks env.
4514

4515
    This runs on master, primary and secondary nodes of the instance.
4516

4517
    """
4518
    env = {
4519
      "ADD_MODE": self.op.mode,
4520
      }
4521
    if self.op.mode == constants.INSTANCE_IMPORT:
4522
      env["SRC_NODE"] = self.op.src_node
4523
      env["SRC_PATH"] = self.op.src_path
4524
      env["SRC_IMAGES"] = self.src_images
4525

    
4526
    env.update(_BuildInstanceHookEnv(
4527
      name=self.op.instance_name,
4528
      primary_node=self.op.pnode,
4529
      secondary_nodes=self.secondaries,
4530
      status=self.op.start,
4531
      os_type=self.op.os_type,
4532
      memory=self.be_full[constants.BE_MEMORY],
4533
      vcpus=self.be_full[constants.BE_VCPUS],
4534
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4535
      disk_template=self.op.disk_template,
4536
      disks=[(d["size"], d["mode"]) for d in self.disks],
4537
    ))
4538

    
4539
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4540
          self.secondaries)
4541
    return env, nl, nl
4542

    
4543

    
4544
  def CheckPrereq(self):
4545
    """Check prerequisites.
4546

4547
    """
4548
    if (not self.cfg.GetVGName() and
4549
        self.op.disk_template not in constants.DTS_NOT_LVM):
4550
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4551
                                 " instances")
4552

    
4553
    if self.op.mode == constants.INSTANCE_IMPORT:
4554
      src_node = self.op.src_node
4555
      src_path = self.op.src_path
4556

    
4557
      if src_node is None:
4558
        exp_list = self.rpc.call_export_list(
4559
          self.acquired_locks[locking.LEVEL_NODE])
4560
        found = False
4561
        for node in exp_list:
4562
          if not exp_list[node].failed and src_path in exp_list[node].data:
4563
            found = True
4564
            self.op.src_node = src_node = node
4565
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4566
                                                       src_path)
4567
            break
4568
        if not found:
4569
          raise errors.OpPrereqError("No export found for relative path %s" %
4570
                                      src_path)
4571

    
4572
      _CheckNodeOnline(self, src_node)
4573
      result = self.rpc.call_export_info(src_node, src_path)
4574
      result.Raise()
4575
      if not result.data:
4576
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4577

    
4578
      export_info = result.data
4579
      if not export_info.has_section(constants.INISECT_EXP):
4580
        raise errors.ProgrammerError("Corrupted export config")
4581

    
4582
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4583
      if (int(ei_version) != constants.EXPORT_VERSION):
4584
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4585
                                   (ei_version, constants.EXPORT_VERSION))
4586

    
4587
      # Check that the new instance doesn't have less disks than the export
4588
      instance_disks = len(self.disks)
4589
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4590
      if instance_disks < export_disks:
4591
        raise errors.OpPrereqError("Not enough disks to import."
4592
                                   " (instance: %d, export: %d)" %
4593
                                   (instance_disks, export_disks))
4594

    
4595
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4596
      disk_images = []
4597
      for idx in range(export_disks):
4598
        option = 'disk%d_dump' % idx
4599
        if export_info.has_option(constants.INISECT_INS, option):
4600
          # FIXME: are the old os-es, disk sizes, etc. useful?
4601
          export_name = export_info.get(constants.INISECT_INS, option)
4602
          image = os.path.join(src_path, export_name)
4603
          disk_images.append(image)
4604
        else:
4605
          disk_images.append(False)
4606

    
4607
      self.src_images = disk_images
4608

    
4609
      old_name = export_info.get(constants.INISECT_INS, 'name')
4610
      # FIXME: int() here could throw a ValueError on broken exports
4611
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4612
      if self.op.instance_name == old_name:
4613
        for idx, nic in enumerate(self.nics):
4614
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4615
            nic_mac_ini = 'nic%d_mac' % idx
4616
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4617

    
4618
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4619
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4620
    if self.op.start and not self.op.ip_check:
4621
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4622
                                 " adding an instance in start mode")
4623

    
4624
    if self.op.ip_check:
4625
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4626
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4627
                                   (self.check_ip, self.op.instance_name))
4628

    
4629
    #### mac address generation
4630
    # By generating here the mac address both the allocator and the hooks get
4631
    # the real final mac address rather than the 'auto' or 'generate' value.
4632
    # There is a race condition between the generation and the instance object
4633
    # creation, which means that we know the mac is valid now, but we're not
4634
    # sure it will be when we actually add the instance. If things go bad
4635
    # adding the instance will abort because of a duplicate mac, and the
4636
    # creation job will fail.
4637
    for nic in self.nics:
4638
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4639
        nic.mac = self.cfg.GenerateMAC()
4640

    
4641
    #### allocator run
4642

    
4643
    if self.op.iallocator is not None:
4644
      self._RunAllocator()
4645

    
4646
    #### node related checks
4647

    
4648
    # check primary node
4649
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4650
    assert self.pnode is not None, \
4651
      "Cannot retrieve locked node %s" % self.op.pnode
4652
    if pnode.offline:
4653
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4654
                                 pnode.name)
4655
    if pnode.drained:
4656
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4657
                                 pnode.name)
4658

    
4659
    self.secondaries = []
4660

    
4661
    # mirror node verification
4662
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4663
      if self.op.snode is None:
4664
        raise errors.OpPrereqError("The networked disk templates need"
4665
                                   " a mirror node")
4666
      if self.op.snode == pnode.name:
4667
        raise errors.OpPrereqError("The secondary node cannot be"
4668
                                   " the primary node.")
4669
      _CheckNodeOnline(self, self.op.snode)
4670
      _CheckNodeNotDrained(self, self.op.snode)
4671
      self.secondaries.append(self.op.snode)
4672

    
4673
    nodenames = [pnode.name] + self.secondaries
4674

    
4675
    req_size = _ComputeDiskSize(self.op.disk_template,
4676
                                self.disks)
4677

    
4678
    # Check lv size requirements
4679
    if req_size is not None:
4680
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4681
                                         self.op.hypervisor)
4682
      for node in nodenames:
4683
        info = nodeinfo[node]
4684
        info.Raise()
4685
        info = info.data
4686
        if not info:
4687
          raise errors.OpPrereqError("Cannot get current information"
4688
                                     " from node '%s'" % node)
4689
        vg_free = info.get('vg_free', None)
4690
        if not isinstance(vg_free, int):
4691
          raise errors.OpPrereqError("Can't compute free disk space on"
4692
                                     " node %s" % node)
4693
        if req_size > info['vg_free']:
4694
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4695
                                     " %d MB available, %d MB required" %
4696
                                     (node, info['vg_free'], req_size))
4697

    
4698
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4699

    
4700
    # os verification
4701
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4702
    result.Raise()
4703
    if not isinstance(result.data, objects.OS):
4704
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4705
                                 " primary node"  % self.op.os_type)
4706

    
4707
    # bridge check on primary node
4708
    bridges = [n.bridge for n in self.nics]
4709
    result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4710
    result.Raise()
4711
    if not result.data:
4712
      raise errors.OpPrereqError("One of the target bridges '%s' does not"
4713
                                 " exist on destination node '%s'" %
4714
                                 (",".join(bridges), pnode.name))
4715

    
4716
    # memory check on primary node
4717
    if self.op.start:
4718
      _CheckNodeFreeMemory(self, self.pnode.name,
4719
                           "creating instance %s" % self.op.instance_name,
4720
                           self.be_full[constants.BE_MEMORY],
4721
                           self.op.hypervisor)
4722

    
4723
  def Exec(self, feedback_fn):
4724
    """Create and add the instance to the cluster.
4725

4726
    """
4727
    instance = self.op.instance_name
4728
    pnode_name = self.pnode.name
4729

    
4730
    ht_kind = self.op.hypervisor
4731
    if ht_kind in constants.HTS_REQ_PORT:
4732
      network_port = self.cfg.AllocatePort()
4733
    else:
4734
      network_port = None
4735

    
4736
    ##if self.op.vnc_bind_address is None:
4737
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4738

    
4739
    # this is needed because os.path.join does not accept None arguments
4740
    if self.op.file_storage_dir is None:
4741
      string_file_storage_dir = ""
4742
    else:
4743
      string_file_storage_dir = self.op.file_storage_dir
4744

    
4745
    # build the full file storage dir path
4746
    file_storage_dir = os.path.normpath(os.path.join(
4747
                                        self.cfg.GetFileStorageDir(),
4748
                                        string_file_storage_dir, instance))
4749

    
4750

    
4751
    disks = _GenerateDiskTemplate(self,
4752
                                  self.op.disk_template,
4753
                                  instance, pnode_name,
4754
                                  self.secondaries,
4755
                                  self.disks,
4756
                                  file_storage_dir,
4757
                                  self.op.file_driver,
4758
                                  0)
4759

    
4760
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4761
                            primary_node=pnode_name,
4762
                            nics=self.nics, disks=disks,
4763
                            disk_template=self.op.disk_template,
4764
                            admin_up=False,
4765
                            network_port=network_port,
4766
                            beparams=self.op.beparams,
4767
                            hvparams=self.op.hvparams,
4768
                            hypervisor=self.op.hypervisor,
4769
                            )
4770

    
4771
    feedback_fn("* creating instance disks...")
4772
    try:
4773
      _CreateDisks(self, iobj)
4774
    except errors.OpExecError:
4775
      self.LogWarning("Device creation failed, reverting...")
4776
      try:
4777
        _RemoveDisks(self, iobj)
4778
      finally:
4779
        self.cfg.ReleaseDRBDMinors(instance)
4780
        raise
4781

    
4782
    feedback_fn("adding instance %s to cluster config" % instance)
4783

    
4784
    self.cfg.AddInstance(iobj)
4785
    # Declare that we don't want to remove the instance lock anymore, as we've
4786
    # added the instance to the config
4787
    del self.remove_locks[locking.LEVEL_INSTANCE]
4788
    # Unlock all the nodes
4789
    if self.op.mode == constants.INSTANCE_IMPORT:
4790
      nodes_keep = [self.op.src_node]
4791
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4792
                       if node != self.op.src_node]
4793
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4794
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4795
    else:
4796
      self.context.glm.release(locking.LEVEL_NODE)
4797
      del self.acquired_locks[locking.LEVEL_NODE]
4798

    
4799
    if self.op.wait_for_sync:
4800
      disk_abort = not _WaitForSync(self, iobj)
4801
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4802
      # make sure the disks are not degraded (still sync-ing is ok)
4803
      time.sleep(15)
4804
      feedback_fn("* checking mirrors status")
4805
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4806
    else:
4807
      disk_abort = False
4808

    
4809
    if disk_abort:
4810
      _RemoveDisks(self, iobj)
4811
      self.cfg.RemoveInstance(iobj.name)
4812
      # Make sure the instance lock gets removed
4813
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4814
      raise errors.OpExecError("There are some degraded disks for"
4815
                               " this instance")
4816

    
4817
    feedback_fn("creating os for instance %s on node %s" %
4818
                (instance, pnode_name))
4819

    
4820
    if iobj.disk_template != constants.DT_DISKLESS:
4821
      if self.op.mode == constants.INSTANCE_CREATE:
4822
        feedback_fn("* running the instance OS create scripts...")
4823
        result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4824
        msg = result.RemoteFailMsg()
4825
        if msg:
4826
          raise errors.OpExecError("Could not add os for instance %s"
4827
                                   " on node %s: %s" %
4828
                                   (instance, pnode_name, msg))
4829

    
4830
      elif self.op.mode == constants.INSTANCE_IMPORT:
4831
        feedback_fn("* running the instance OS import scripts...")
4832
        src_node = self.op.src_node
4833
        src_images = self.src_images
4834
        cluster_name = self.cfg.GetClusterName()
4835
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4836
                                                         src_node, src_images,
4837
                                                         cluster_name)
4838
        import_result.Raise()
4839
        for idx, result in enumerate(import_result.data):
4840
          if not result:
4841
            self.LogWarning("Could not import the image %s for instance"
4842
                            " %s, disk %d, on node %s" %
4843
                            (src_images[idx], instance, idx, pnode_name))
4844
      else:
4845
        # also checked in the prereq part
4846
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4847
                                     % self.op.mode)
4848

    
4849
    if self.op.start:
4850
      iobj.admin_up = True
4851
      self.cfg.Update(iobj)
4852
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4853
      feedback_fn("* starting instance...")
4854
      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4855
      msg = result.RemoteFailMsg()
4856
      if msg:
4857
        raise errors.OpExecError("Could not start instance: %s" % msg)
4858

    
4859

    
4860
class LUConnectConsole(NoHooksLU):
4861
  """Connect to an instance's console.
4862

4863
  This is somewhat special in that it returns the command line that
4864
  you need to run on the master node in order to connect to the
4865
  console.
4866

4867
  """
4868
  _OP_REQP = ["instance_name"]
4869
  REQ_BGL = False
4870

    
4871
  def ExpandNames(self):
4872
    self._ExpandAndLockInstance()
4873

    
4874
  def CheckPrereq(self):
4875
    """Check prerequisites.
4876

4877
    This checks that the instance is in the cluster.
4878

4879
    """
4880
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4881
    assert self.instance is not None, \
4882
      "Cannot retrieve locked instance %s" % self.op.instance_name
4883
    _CheckNodeOnline(self, self.instance.primary_node)
4884

    
4885
  def Exec(self, feedback_fn):
4886
    """Connect to the console of an instance
4887

4888
    """
4889
    instance = self.instance
4890
    node = instance.primary_node
4891

    
4892
    node_insts = self.rpc.call_instance_list([node],
4893
                                             [instance.hypervisor])[node]
4894
    node_insts.Raise()
4895

    
4896
    if instance.name not in node_insts.data:
4897
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4898

    
4899
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4900

    
4901
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4902
    cluster = self.cfg.GetClusterInfo()
4903
    # beparams and hvparams are passed separately, to avoid editing the
4904
    # instance and then saving the defaults in the instance itself.
4905
    hvparams = cluster.FillHV(instance)
4906
    beparams = cluster.FillBE(instance)
4907
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4908

    
4909
    # build ssh cmdline
4910
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4911

    
4912

    
4913
class LUReplaceDisks(LogicalUnit):
4914
  """Replace the disks of an instance.
4915

4916
  """
4917
  HPATH = "mirrors-replace"
4918
  HTYPE = constants.HTYPE_INSTANCE
4919
  _OP_REQP = ["instance_name", "mode", "disks"]
4920
  REQ_BGL = False
4921

    
4922
  def CheckArguments(self):
4923
    if not hasattr(self.op, "remote_node"):
4924
      self.op.remote_node = None
4925
    if not hasattr(self.op, "iallocator"):
4926
      self.op.iallocator = None
4927

    
4928
    # check for valid parameter combination
4929
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4930
    if self.op.mode == constants.REPLACE_DISK_CHG:
4931
      if cnt == 2:
4932
        raise errors.OpPrereqError("When changing the secondary either an"
4933
                                   " iallocator script must be used or the"
4934
                                   " new node given")
4935
      elif cnt == 0:
4936
        raise errors.OpPrereqError("Give either the iallocator or the new"
4937
                                   " secondary, not both")
4938
    else: # not replacing the secondary
4939
      if cnt != 2:
4940
        raise errors.OpPrereqError("The iallocator and new node options can"
4941
                                   " be used only when changing the"
4942
                                   " secondary node")
4943

    
4944
  def ExpandNames(self):
4945
    self._ExpandAndLockInstance()
4946

    
4947
    if self.op.iallocator is not None:
4948
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4949
    elif self.op.remote_node is not None:
4950
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
4951
      if remote_node is None:
4952
        raise errors.OpPrereqError("Node '%s' not known" %
4953
                                   self.op.remote_node)
4954
      self.op.remote_node = remote_node
4955
      # Warning: do not remove the locking of the new secondary here
4956
      # unless DRBD8.AddChildren is changed to work in parallel;
4957
      # currently it doesn't since parallel invocations of
4958
      # FindUnusedMinor will conflict
4959
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
4960
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4961
    else:
4962
      self.needed_locks[locking.LEVEL_NODE] = []
4963
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4964

    
4965
  def DeclareLocks(self, level):
4966
    # If we're not already locking all nodes in the set we have to declare the
4967
    # instance's primary/secondary nodes.
4968
    if (level == locking.LEVEL_NODE and
4969
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
4970
      self._LockInstancesNodes()
4971

    
4972
  def _RunAllocator(self):
4973
    """Compute a new secondary node using an IAllocator.
4974

4975
    """
4976
    ial = IAllocator(self,
4977
                     mode=constants.IALLOCATOR_MODE_RELOC,
4978
                     name=self.op.instance_name,
4979
                     relocate_from=[self.sec_node])
4980

    
4981
    ial.Run(self.op.iallocator)
4982

    
4983
    if not ial.success:
4984
      raise errors.OpPrereqError("Can't compute nodes using"
4985
                                 " iallocator '%s': %s" % (self.op.iallocator,
4986
                                                           ial.info))
4987
    if len(ial.nodes) != ial.required_nodes:
4988
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4989
                                 " of nodes (%s), required %s" %
4990
                                 (len(ial.nodes), ial.required_nodes))
4991
    self.op.remote_node = ial.nodes[0]
4992
    self.LogInfo("Selected new secondary for the instance: %s",
4993
                 self.op.remote_node)
4994

    
4995
  def BuildHooksEnv(self):
4996
    """Build hooks env.
4997

4998
    This runs on the master, the primary and all the secondaries.
4999

5000
    """
5001
    env = {
5002
      "MODE": self.op.mode,
5003
      "NEW_SECONDARY": self.op.remote_node,
5004
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
5005
      }
5006
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5007
    nl = [
5008
      self.cfg.GetMasterNode(),
5009
      self.instance.primary_node,
5010
      ]
5011
    if self.op.remote_node is not None:
5012
      nl.append(self.op.remote_node)
5013
    return env, nl, nl
5014

    
5015
  def CheckPrereq(self):
5016
    """Check prerequisites.
5017

5018
    This checks that the instance is in the cluster.
5019

5020
    """
5021
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5022
    assert instance is not None, \
5023
      "Cannot retrieve locked instance %s" % self.op.instance_name
5024
    self.instance = instance
5025

    
5026
    if instance.disk_template != constants.DT_DRBD8:
5027
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5028
                                 " instances")
5029

    
5030
    if len(instance.secondary_nodes) != 1:
5031
      raise errors.OpPrereqError("The instance has a strange layout,"
5032
                                 " expected one secondary but found %d" %
5033
                                 len(instance.secondary_nodes))
5034

    
5035
    self.sec_node = instance.secondary_nodes[0]
5036

    
5037
    if self.op.iallocator is not None:
5038
      self._RunAllocator()
5039

    
5040
    remote_node = self.op.remote_node
5041
    if remote_node is not None:
5042
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5043
      assert self.remote_node_info is not None, \
5044
        "Cannot retrieve locked node %s" % remote_node
5045
    else:
5046
      self.remote_node_info = None
5047
    if remote_node == instance.primary_node:
5048
      raise errors.OpPrereqError("The specified node is the primary node of"
5049
                                 " the instance.")
5050
    elif remote_node == self.sec_node:
5051
      raise errors.OpPrereqError("The specified node is already the"
5052
                                 " secondary node of the instance.")
5053

    
5054
    if self.op.mode == constants.REPLACE_DISK_PRI:
5055
      n1 = self.tgt_node = instance.primary_node
5056
      n2 = self.oth_node = self.sec_node
5057
    elif self.op.mode == constants.REPLACE_DISK_SEC:
5058
      n1 = self.tgt_node = self.sec_node
5059
      n2 = self.oth_node = instance.primary_node
5060
    elif self.op.mode == constants.REPLACE_DISK_CHG:
5061
      n1 = self.new_node = remote_node
5062
      n2 = self.oth_node = instance.primary_node
5063
      self.tgt_node = self.sec_node
5064
      _CheckNodeNotDrained(self, remote_node)
5065
    else:
5066
      raise errors.ProgrammerError("Unhandled disk replace mode")
5067

    
5068
    _CheckNodeOnline(self, n1)
5069
    _CheckNodeOnline(self, n2)
5070

    
5071
    if not self.op.disks:
5072
      self.op.disks = range(len(instance.disks))
5073

    
5074
    for disk_idx in self.op.disks:
5075
      instance.FindDisk(disk_idx)
5076

    
5077
  def _ExecD8DiskOnly(self, feedback_fn):
5078
    """Replace a disk on the primary or secondary for dbrd8.
5079

5080
    The algorithm for replace is quite complicated:
5081

5082
      1. for each disk to be replaced:
5083

5084
        1. create new LVs on the target node with unique names
5085
        1. detach old LVs from the drbd device
5086
        1. rename old LVs to name_replaced.<time_t>
5087
        1. rename new LVs to old LVs
5088
        1. attach the new LVs (with the old names now) to the drbd device
5089

5090
      1. wait for sync across all devices
5091

5092
      1. for each modified disk:
5093

5094
        1. remove old LVs (which have the name name_replaces.<time_t>)
5095

5096
    Failures are not very well handled.
5097

5098
    """
5099
    steps_total = 6
5100
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5101
    instance = self.instance
5102
    iv_names = {}
5103
    vgname = self.cfg.GetVGName()
5104
    # start of work
5105
    cfg = self.cfg
5106
    tgt_node = self.tgt_node
5107
    oth_node = self.oth_node
5108

    
5109
    # Step: check device activation
5110
    self.proc.LogStep(1, steps_total, "check device existence")
5111
    info("checking volume groups")
5112
    my_vg = cfg.GetVGName()
5113
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5114
    if not results:
5115
      raise errors.OpExecError("Can't list volume groups on the nodes")
5116
    for node in oth_node, tgt_node:
5117
      res = results[node]
5118
      if res.failed or not res.data or my_vg not in res.data:
5119
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5120
                                 (my_vg, node))
5121
    for idx, dev in enumerate(instance.disks):
5122
      if idx not in self.op.disks:
5123
        continue
5124
      for node in tgt_node, oth_node:
5125
        info("checking disk/%d on %s" % (idx, node))
5126
        cfg.SetDiskID(dev, node)
5127
        result = self.rpc.call_blockdev_find(node, dev)
5128
        msg = result.RemoteFailMsg()
5129
        if not msg and not result.payload:
5130
          msg = "disk not found"
5131
        if msg:
5132
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5133
                                   (idx, node, msg))
5134

    
5135
    # Step: check other node consistency
5136
    self.proc.LogStep(2, steps_total, "check peer consistency")
5137
    for idx, dev in enumerate(instance.disks):
5138
      if idx not in self.op.disks:
5139
        continue
5140
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5141
      if not _CheckDiskConsistency(self, dev, oth_node,
5142
                                   oth_node==instance.primary_node):
5143
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5144
                                 " to replace disks on this node (%s)" %
5145
                                 (oth_node, tgt_node))
5146

    
5147
    # Step: create new storage
5148
    self.proc.LogStep(3, steps_total, "allocate new storage")
5149
    for idx, dev in enumerate(instance.disks):
5150
      if idx not in self.op.disks:
5151
        continue
5152
      size = dev.size
5153
      cfg.SetDiskID(dev, tgt_node)
5154
      lv_names = [".disk%d_%s" % (idx, suf)
5155
                  for suf in ["data", "meta"]]
5156
      names = _GenerateUniqueNames(self, lv_names)
5157
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5158
                             logical_id=(vgname, names[0]))
5159
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5160
                             logical_id=(vgname, names[1]))
5161
      new_lvs = [lv_data, lv_meta]
5162
      old_lvs = dev.children
5163
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5164
      info("creating new local storage on %s for %s" %
5165
           (tgt_node, dev.iv_name))
5166
      # we pass force_create=True to force the LVM creation
5167
      for new_lv in new_lvs:
5168
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5169
                        _GetInstanceInfoText(instance), False)
5170

    
5171
    # Step: for each lv, detach+rename*2+attach
5172
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5173
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5174
      info("detaching %s drbd from local storage" % dev.iv_name)
5175
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5176
      result.Raise()
5177
      if not result.data:
5178
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5179
                                 " %s for device %s" % (tgt_node, dev.iv_name))
5180
      #dev.children = []
5181
      #cfg.Update(instance)
5182

    
5183
      # ok, we created the new LVs, so now we know we have the needed
5184
      # storage; as such, we proceed on the target node to rename
5185
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5186
      # using the assumption that logical_id == physical_id (which in
5187
      # turn is the unique_id on that node)
5188

    
5189
      # FIXME(iustin): use a better name for the replaced LVs
5190
      temp_suffix = int(time.time())
5191
      ren_fn = lambda d, suff: (d.physical_id[0],
5192
                                d.physical_id[1] + "_replaced-%s" % suff)
5193
      # build the rename list based on what LVs exist on the node
5194
      rlist = []
5195
      for to_ren in old_lvs:
5196
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5197
        if not result.RemoteFailMsg() and result.payload:
5198
          # device exists
5199
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5200

    
5201
      info("renaming the old LVs on the target node")
5202
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5203
      result.Raise()
5204
      if not result.data:
5205
        raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5206
      # now we rename the new LVs to the old LVs
5207
      info("renaming the new LVs on the target node")
5208
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5209
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5210
      result.Raise()
5211
      if not result.data:
5212
        raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5213

    
5214
      for old, new in zip(old_lvs, new_lvs):
5215
        new.logical_id = old.logical_id
5216
        cfg.SetDiskID(new, tgt_node)
5217

    
5218
      for disk in old_lvs:
5219
        disk.logical_id = ren_fn(disk, temp_suffix)
5220
        cfg.SetDiskID(disk, tgt_node)
5221

    
5222
      # now that the new lvs have the old name, we can add them to the device
5223
      info("adding new mirror component on %s" % tgt_node)
5224
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5225
      if result.failed or not result.data:
5226
        for new_lv in new_lvs:
5227
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5228
          if msg:
5229
            warning("Can't rollback device %s: %s", dev, msg,
5230
                    hint="cleanup manually the unused logical volumes")
5231
        raise errors.OpExecError("Can't add local storage to drbd")
5232

    
5233
      dev.children = new_lvs
5234
      cfg.Update(instance)
5235

    
5236
    # Step: wait for sync
5237

    
5238
    # this can fail as the old devices are degraded and _WaitForSync
5239
    # does a combined result over all disks, so we don't check its
5240
    # return value
5241
    self.proc.LogStep(5, steps_total, "sync devices")
5242
    _WaitForSync(self, instance, unlock=True)
5243

    
5244
    # so check manually all the devices
5245
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5246
      cfg.SetDiskID(dev, instance.primary_node)
5247
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5248
      msg = result.RemoteFailMsg()
5249
      if not msg and not result.payload:
5250
        msg = "disk not found"
5251
      if msg:
5252
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5253
                                 (name, msg))
5254
      if result.payload[5]:
5255
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5256

    
5257
    # Step: remove old storage
5258
    self.proc.LogStep(6, steps_total, "removing old storage")
5259
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5260
      info("remove logical volumes for %s" % name)
5261
      for lv in old_lvs:
5262
        cfg.SetDiskID(lv, tgt_node)
5263
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5264
        if msg:
5265
          warning("Can't remove old LV: %s" % msg,
5266
                  hint="manually remove unused LVs")
5267
          continue
5268

    
5269
  def _ExecD8Secondary(self, feedback_fn):
5270
    """Replace the secondary node for drbd8.
5271

5272
    The algorithm for replace is quite complicated:
5273
      - for all disks of the instance:
5274
        - create new LVs on the new node with same names
5275
        - shutdown the drbd device on the old secondary
5276
        - disconnect the drbd network on the primary
5277
        - create the drbd device on the new secondary
5278
        - network attach the drbd on the primary, using an artifice:
5279
          the drbd code for Attach() will connect to the network if it
5280
          finds a device which is connected to the good local disks but
5281
          not network enabled
5282
      - wait for sync across all devices
5283
      - remove all disks from the old secondary
5284

5285
    Failures are not very well handled.
5286

5287
    """
5288
    steps_total = 6
5289
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5290
    instance = self.instance
5291
    iv_names = {}
5292
    # start of work
5293
    cfg = self.cfg
5294
    old_node = self.tgt_node
5295
    new_node = self.new_node
5296
    pri_node = instance.primary_node
5297
    nodes_ip = {
5298
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5299
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5300
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5301
      }
5302

    
5303
    # Step: check device activation
5304
    self.proc.LogStep(1, steps_total, "check device existence")
5305
    info("checking volume groups")
5306
    my_vg = cfg.GetVGName()
5307
    results = self.rpc.call_vg_list([pri_node, new_node])
5308
    for node in pri_node, new_node:
5309
      res = results[node]
5310
      if res.failed or not res.data or my_vg not in res.data:
5311
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5312
                                 (my_vg, node))
5313
    for idx, dev in enumerate(instance.disks):
5314
      if idx not in self.op.disks:
5315
        continue
5316
      info("checking disk/%d on %s" % (idx, pri_node))
5317
      cfg.SetDiskID(dev, pri_node)
5318
      result = self.rpc.call_blockdev_find(pri_node, dev)
5319
      msg = result.RemoteFailMsg()
5320
      if not msg and not result.payload:
5321
        msg = "disk not found"
5322
      if msg:
5323
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5324
                                 (idx, pri_node, msg))
5325

    
5326
    # Step: check other node consistency
5327
    self.proc.LogStep(2, steps_total, "check peer consistency")
5328
    for idx, dev in enumerate(instance.disks):
5329
      if idx not in self.op.disks:
5330
        continue
5331
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5332
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5333
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5334
                                 " unsafe to replace the secondary" %
5335
                                 pri_node)
5336

    
5337
    # Step: create new storage
5338
    self.proc.LogStep(3, steps_total, "allocate new storage")
5339
    for idx, dev in enumerate(instance.disks):
5340
      info("adding new local storage on %s for disk/%d" %
5341
           (new_node, idx))
5342
      # we pass force_create=True to force LVM creation
5343
      for new_lv in dev.children:
5344
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5345
                        _GetInstanceInfoText(instance), False)
5346

    
5347
    # Step 4: dbrd minors and drbd setups changes
5348
    # after this, we must manually remove the drbd minors on both the
5349
    # error and the success paths
5350
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5351
                                   instance.name)
5352
    logging.debug("Allocated minors %s" % (minors,))
5353
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5354
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5355
      size = dev.size
5356
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5357
      # create new devices on new_node; note that we create two IDs:
5358
      # one without port, so the drbd will be activated without
5359
      # networking information on the new node at this stage, and one
5360
      # with network, for the latter activation in step 4
5361
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5362
      if pri_node == o_node1:
5363
        p_minor = o_minor1
5364
      else:
5365
        p_minor = o_minor2
5366

    
5367
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5368
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5369

    
5370
      iv_names[idx] = (dev, dev.children, new_net_id)
5371
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5372
                    new_net_id)
5373
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5374
                              logical_id=new_alone_id,
5375
                              children=dev.children)
5376
      try:
5377
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5378
                              _GetInstanceInfoText(instance), False)
5379
      except errors.GenericError:
5380
        self.cfg.ReleaseDRBDMinors(instance.name)
5381
        raise
5382

    
5383
    for idx, dev in enumerate(instance.disks):
5384
      # we have new devices, shutdown the drbd on the old secondary
5385
      info("shutting down drbd for disk/%d on old node" % idx)
5386
      cfg.SetDiskID(dev, old_node)
5387
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5388
      if msg:
5389
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5390
                (idx, msg),
5391
                hint="Please cleanup this device manually as soon as possible")
5392

    
5393
    info("detaching primary drbds from the network (=> standalone)")
5394
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5395
                                               instance.disks)[pri_node]
5396

    
5397
    msg = result.RemoteFailMsg()
5398
    if msg:
5399
      # detaches didn't succeed (unlikely)
5400
      self.cfg.ReleaseDRBDMinors(instance.name)
5401
      raise errors.OpExecError("Can't detach the disks from the network on"
5402
                               " old node: %s" % (msg,))
5403

    
5404
    # if we managed to detach at least one, we update all the disks of
5405
    # the instance to point to the new secondary
5406
    info("updating instance configuration")
5407
    for dev, _, new_logical_id in iv_names.itervalues():
5408
      dev.logical_id = new_logical_id
5409
      cfg.SetDiskID(dev, pri_node)
5410
    cfg.Update(instance)
5411

    
5412
    # and now perform the drbd attach
5413
    info("attaching primary drbds to new secondary (standalone => connected)")
5414
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5415
                                           instance.disks, instance.name,
5416
                                           False)
5417
    for to_node, to_result in result.items():
5418
      msg = to_result.RemoteFailMsg()
5419
      if msg:
5420
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5421
                hint="please do a gnt-instance info to see the"
5422
                " status of disks")
5423

    
5424
    # this can fail as the old devices are degraded and _WaitForSync
5425
    # does a combined result over all disks, so we don't check its
5426
    # return value
5427
    self.proc.LogStep(5, steps_total, "sync devices")
5428
    _WaitForSync(self, instance, unlock=True)
5429

    
5430
    # so check manually all the devices
5431
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5432
      cfg.SetDiskID(dev, pri_node)
5433
      result = self.rpc.call_blockdev_find(pri_node, dev)
5434
      msg = result.RemoteFailMsg()
5435
      if not msg and not result.payload:
5436
        msg = "disk not found"
5437
      if msg:
5438
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5439
                                 (idx, msg))
5440
      if result.payload[5]:
5441
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5442

    
5443
    self.proc.LogStep(6, steps_total, "removing old storage")
5444
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5445
      info("remove logical volumes for disk/%d" % idx)
5446
      for lv in old_lvs:
5447
        cfg.SetDiskID(lv, old_node)
5448
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5449
        if msg:
5450
          warning("Can't remove LV on old secondary: %s", msg,
5451
                  hint="Cleanup stale volumes by hand")
5452

    
5453
  def Exec(self, feedback_fn):
5454
    """Execute disk replacement.
5455

5456
    This dispatches the disk replacement to the appropriate handler.
5457

5458
    """
5459
    instance = self.instance
5460

    
5461
    # Activate the instance disks if we're replacing them on a down instance
5462
    if not instance.admin_up:
5463
      _StartInstanceDisks(self, instance, True)
5464

    
5465
    if self.op.mode == constants.REPLACE_DISK_CHG:
5466
      fn = self._ExecD8Secondary
5467
    else:
5468
      fn = self._ExecD8DiskOnly
5469

    
5470
    ret = fn(feedback_fn)
5471

    
5472
    # Deactivate the instance disks if we're replacing them on a down instance
5473
    if not instance.admin_up:
5474
      _SafeShutdownInstanceDisks(self, instance)
5475

    
5476
    return ret
5477

    
5478

    
5479
class LUGrowDisk(LogicalUnit):
5480
  """Grow a disk of an instance.
5481

5482
  """
5483
  HPATH = "disk-grow"
5484
  HTYPE = constants.HTYPE_INSTANCE
5485
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5486
  REQ_BGL = False
5487

    
5488
  def ExpandNames(self):
5489
    self._ExpandAndLockInstance()
5490
    self.needed_locks[locking.LEVEL_NODE] = []
5491
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5492

    
5493
  def DeclareLocks(self, level):
5494
    if level == locking.LEVEL_NODE:
5495
      self._LockInstancesNodes()
5496

    
5497
  def BuildHooksEnv(self):
5498
    """Build hooks env.
5499

5500
    This runs on the master, the primary and all the secondaries.
5501

5502
    """
5503
    env = {
5504
      "DISK": self.op.disk,
5505
      "AMOUNT": self.op.amount,
5506
      }
5507
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5508
    nl = [
5509
      self.cfg.GetMasterNode(),
5510
      self.instance.primary_node,
5511
      ]
5512
    return env, nl, nl
5513

    
5514
  def CheckPrereq(self):
5515
    """Check prerequisites.
5516

5517
    This checks that the instance is in the cluster.
5518

5519
    """
5520
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5521
    assert instance is not None, \
5522
      "Cannot retrieve locked instance %s" % self.op.instance_name
5523
    nodenames = list(instance.all_nodes)
5524
    for node in nodenames:
5525
      _CheckNodeOnline(self, node)
5526

    
5527

    
5528
    self.instance = instance
5529

    
5530
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5531
      raise errors.OpPrereqError("Instance's disk layout does not support"
5532
                                 " growing.")
5533

    
5534
    self.disk = instance.FindDisk(self.op.disk)
5535

    
5536
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5537
                                       instance.hypervisor)
5538
    for node in nodenames:
5539
      info = nodeinfo[node]
5540
      if info.failed or not info.data:
5541
        raise errors.OpPrereqError("Cannot get current information"
5542
                                   " from node '%s'" % node)
5543
      vg_free = info.data.get('vg_free', None)
5544
      if not isinstance(vg_free, int):
5545
        raise errors.OpPrereqError("Can't compute free disk space on"
5546
                                   " node %s" % node)
5547
      if self.op.amount > vg_free:
5548
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5549
                                   " %d MiB available, %d MiB required" %
5550
                                   (node, vg_free, self.op.amount))
5551

    
5552
  def Exec(self, feedback_fn):
5553
    """Execute disk grow.
5554

5555
    """
5556
    instance = self.instance
5557
    disk = self.disk
5558
    for node in instance.all_nodes:
5559
      self.cfg.SetDiskID(disk, node)
5560
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5561
      msg = result.RemoteFailMsg()
5562
      if msg:
5563
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5564
                                 (node, msg))
5565
    disk.RecordGrow(self.op.amount)
5566
    self.cfg.Update(instance)
5567
    if self.op.wait_for_sync:
5568
      disk_abort = not _WaitForSync(self, instance)
5569
      if disk_abort:
5570
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5571
                             " status.\nPlease check the instance.")
5572

    
5573

    
5574
class LUQueryInstanceData(NoHooksLU):
5575
  """Query runtime instance data.
5576

5577
  """
5578
  _OP_REQP = ["instances", "static"]
5579
  REQ_BGL = False
5580

    
5581
  def ExpandNames(self):
5582
    self.needed_locks = {}
5583
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5584

    
5585
    if not isinstance(self.op.instances, list):
5586
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5587

    
5588
    if self.op.instances:
5589
      self.wanted_names = []
5590
      for name in self.op.instances:
5591
        full_name = self.cfg.ExpandInstanceName(name)
5592
        if full_name is None:
5593
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5594
        self.wanted_names.append(full_name)
5595
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5596
    else:
5597
      self.wanted_names = None
5598
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5599

    
5600
    self.needed_locks[locking.LEVEL_NODE] = []
5601
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5602

    
5603
  def DeclareLocks(self, level):
5604
    if level == locking.LEVEL_NODE:
5605
      self._LockInstancesNodes()
5606

    
5607
  def CheckPrereq(self):
5608
    """Check prerequisites.
5609

5610
    This only checks the optional instance list against the existing names.
5611

5612
    """
5613
    if self.wanted_names is None:
5614
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5615

    
5616
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5617
                             in self.wanted_names]
5618
    return
5619

    
5620
  def _ComputeDiskStatus(self, instance, snode, dev):
5621
    """Compute block device status.
5622

5623
    """
5624
    static = self.op.static
5625
    if not static:
5626
      self.cfg.SetDiskID(dev, instance.primary_node)
5627
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5628
      if dev_pstatus.offline:
5629
        dev_pstatus = None
5630
      else:
5631
        msg = dev_pstatus.RemoteFailMsg()
5632
        if msg:
5633
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5634
                                   (instance.name, msg))
5635
        dev_pstatus = dev_pstatus.payload
5636
    else:
5637
      dev_pstatus = None
5638

    
5639
    if dev.dev_type in constants.LDS_DRBD:
5640
      # we change the snode then (otherwise we use the one passed in)
5641
      if dev.logical_id[0] == instance.primary_node:
5642
        snode = dev.logical_id[1]
5643
      else:
5644
        snode = dev.logical_id[0]
5645

    
5646
    if snode and not static:
5647
      self.cfg.SetDiskID(dev, snode)
5648
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5649
      if dev_sstatus.offline:
5650
        dev_sstatus = None
5651
      else:
5652
        msg = dev_sstatus.RemoteFailMsg()
5653
        if msg:
5654
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5655
                                   (instance.name, msg))
5656
        dev_sstatus = dev_sstatus.payload
5657
    else:
5658
      dev_sstatus = None
5659

    
5660
    if dev.children:
5661
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5662
                      for child in dev.children]
5663
    else:
5664
      dev_children = []
5665

    
5666
    data = {
5667
      "iv_name": dev.iv_name,
5668
      "dev_type": dev.dev_type,
5669
      "logical_id": dev.logical_id,
5670
      "physical_id": dev.physical_id,
5671
      "pstatus": dev_pstatus,
5672
      "sstatus": dev_sstatus,
5673
      "children": dev_children,
5674
      "mode": dev.mode,
5675
      }
5676

    
5677
    return data
5678

    
5679
  def Exec(self, feedback_fn):
5680
    """Gather and return data"""
5681
    result = {}
5682

    
5683
    cluster = self.cfg.GetClusterInfo()
5684

    
5685
    for instance in self.wanted_instances:
5686
      if not self.op.static:
5687
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5688
                                                  instance.name,
5689
                                                  instance.hypervisor)
5690
        remote_info.Raise()
5691
        remote_info = remote_info.data
5692
        if remote_info and "state" in remote_info:
5693
          remote_state = "up"
5694
        else:
5695
          remote_state = "down"
5696
      else:
5697
        remote_state = None
5698
      if instance.admin_up:
5699
        config_state = "up"
5700
      else:
5701
        config_state = "down"
5702

    
5703
      disks = [self._ComputeDiskStatus(instance, None, device)
5704
               for device in instance.disks]
5705

    
5706
      idict = {
5707
        "name": instance.name,
5708
        "config_state": config_state,
5709
        "run_state": remote_state,
5710
        "pnode": instance.primary_node,
5711
        "snodes": instance.secondary_nodes,
5712
        "os": instance.os,
5713
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5714
        "disks": disks,
5715
        "hypervisor": instance.hypervisor,
5716
        "network_port": instance.network_port,
5717
        "hv_instance": instance.hvparams,
5718
        "hv_actual": cluster.FillHV(instance),
5719
        "be_instance": instance.beparams,
5720
        "be_actual": cluster.FillBE(instance),
5721
        }
5722

    
5723
      result[instance.name] = idict
5724

    
5725
    return result
5726

    
5727

    
5728
class LUSetInstanceParams(LogicalUnit):
5729
  """Modifies an instances's parameters.
5730

5731
  """
5732
  HPATH = "instance-modify"
5733
  HTYPE = constants.HTYPE_INSTANCE
5734
  _OP_REQP = ["instance_name"]
5735
  REQ_BGL = False
5736

    
5737
  def CheckArguments(self):
5738
    if not hasattr(self.op, 'nics'):
5739
      self.op.nics = []
5740
    if not hasattr(self.op, 'disks'):
5741
      self.op.disks = []
5742
    if not hasattr(self.op, 'beparams'):
5743
      self.op.beparams = {}
5744
    if not hasattr(self.op, 'hvparams'):
5745
      self.op.hvparams = {}
5746
    self.op.force = getattr(self.op, "force", False)
5747
    if not (self.op.nics or self.op.disks or
5748
            self.op.hvparams or self.op.beparams):
5749
      raise errors.OpPrereqError("No changes submitted")
5750

    
5751
    # Disk validation
5752
    disk_addremove = 0
5753
    for disk_op, disk_dict in self.op.disks:
5754
      if disk_op == constants.DDM_REMOVE:
5755
        disk_addremove += 1
5756
        continue
5757
      elif disk_op == constants.DDM_ADD:
5758
        disk_addremove += 1
5759
      else:
5760
        if not isinstance(disk_op, int):
5761
          raise errors.OpPrereqError("Invalid disk index")
5762
      if disk_op == constants.DDM_ADD:
5763
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5764
        if mode not in constants.DISK_ACCESS_SET:
5765
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5766
        size = disk_dict.get('size', None)
5767
        if size is None:
5768
          raise errors.OpPrereqError("Required disk parameter size missing")
5769
        try:
5770
          size = int(size)
5771
        except ValueError, err:
5772
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5773
                                     str(err))
5774
        disk_dict['size'] = size
5775
      else:
5776
        # modification of disk
5777
        if 'size' in disk_dict:
5778
          raise errors.OpPrereqError("Disk size change not possible, use"
5779
                                     " grow-disk")
5780

    
5781
    if disk_addremove > 1:
5782
      raise errors.OpPrereqError("Only one disk add or remove operation"
5783
                                 " supported at a time")
5784

    
5785
    # NIC validation
5786
    nic_addremove = 0
5787
    for nic_op, nic_dict in self.op.nics:
5788
      if nic_op == constants.DDM_REMOVE:
5789
        nic_addremove += 1
5790
        continue
5791
      elif nic_op == constants.DDM_ADD:
5792
        nic_addremove += 1
5793
      else:
5794
        if not isinstance(nic_op, int):
5795
          raise errors.OpPrereqError("Invalid nic index")
5796

    
5797
      # nic_dict should be a dict
5798
      nic_ip = nic_dict.get('ip', None)
5799
      if nic_ip is not None:
5800
        if nic_ip.lower() == constants.VALUE_NONE:
5801
          nic_dict['ip'] = None
5802
        else:
5803
          if not utils.IsValidIP(nic_ip):
5804
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5805

    
5806
      if nic_op == constants.DDM_ADD:
5807
        nic_bridge = nic_dict.get('bridge', None)
5808
        if nic_bridge is None:
5809
          nic_dict['bridge'] = self.cfg.GetDefBridge()
5810
        nic_mac = nic_dict.get('mac', None)
5811
        if nic_mac is None:
5812
          nic_dict['mac'] = constants.VALUE_AUTO
5813

    
5814
      if 'mac' in nic_dict:
5815
        nic_mac = nic_dict['mac']
5816
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5817
          if not utils.IsValidMac(nic_mac):
5818
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5819
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5820
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5821
                                     " modifying an existing nic")
5822

    
5823
    if nic_addremove > 1:
5824
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5825
                                 " supported at a time")
5826

    
5827
  def ExpandNames(self):
5828
    self._ExpandAndLockInstance()
5829
    self.needed_locks[locking.LEVEL_NODE] = []
5830
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5831

    
5832
  def DeclareLocks(self, level):
5833
    if level == locking.LEVEL_NODE:
5834
      self._LockInstancesNodes()
5835

    
5836
  def BuildHooksEnv(self):
5837
    """Build hooks env.
5838

5839
    This runs on the master, primary and secondaries.
5840

5841
    """
5842
    args = dict()
5843
    if constants.BE_MEMORY in self.be_new:
5844
      args['memory'] = self.be_new[constants.BE_MEMORY]
5845
    if constants.BE_VCPUS in self.be_new:
5846
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5847
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5848
    # information at all.
5849
    if self.op.nics:
5850
      args['nics'] = []
5851
      nic_override = dict(self.op.nics)
5852
      for idx, nic in enumerate(self.instance.nics):
5853
        if idx in nic_override:
5854
          this_nic_override = nic_override[idx]
5855
        else:
5856
          this_nic_override = {}
5857
        if 'ip' in this_nic_override:
5858
          ip = this_nic_override['ip']
5859
        else:
5860
          ip = nic.ip
5861
        if 'bridge' in this_nic_override:
5862
          bridge = this_nic_override['bridge']
5863
        else:
5864
          bridge = nic.bridge
5865
        if 'mac' in this_nic_override:
5866
          mac = this_nic_override['mac']
5867
        else:
5868
          mac = nic.mac
5869
        args['nics'].append((ip, bridge, mac))
5870
      if constants.DDM_ADD in nic_override:
5871
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5872
        bridge = nic_override[constants.DDM_ADD]['bridge']
5873
        mac = nic_override[constants.DDM_ADD]['mac']
5874
        args['nics'].append((ip, bridge, mac))
5875
      elif constants.DDM_REMOVE in nic_override:
5876
        del args['nics'][-1]
5877

    
5878
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5879
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5880
    return env, nl, nl
5881

    
5882
  def CheckPrereq(self):
5883
    """Check prerequisites.
5884

5885
    This only checks the instance list against the existing names.
5886

5887
    """
5888
    force = self.force = self.op.force
5889

    
5890
    # checking the new params on the primary/secondary nodes
5891

    
5892
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5893
    assert self.instance is not None, \
5894
      "Cannot retrieve locked instance %s" % self.op.instance_name
5895
    pnode = instance.primary_node
5896
    nodelist = list(instance.all_nodes)
5897

    
5898
    # hvparams processing
5899
    if self.op.hvparams:
5900
      i_hvdict = copy.deepcopy(instance.hvparams)
5901
      for key, val in self.op.hvparams.iteritems():
5902
        if val == constants.VALUE_DEFAULT:
5903
          try:
5904
            del i_hvdict[key]
5905
          except KeyError:
5906
            pass
5907
        else:
5908
          i_hvdict[key] = val
5909
      cluster = self.cfg.GetClusterInfo()
5910
      utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5911
      hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5912
                                i_hvdict)
5913
      # local check
5914
      hypervisor.GetHypervisor(
5915
        instance.hypervisor).CheckParameterSyntax(hv_new)
5916
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5917
      self.hv_new = hv_new # the new actual values
5918
      self.hv_inst = i_hvdict # the new dict (without defaults)
5919
    else:
5920
      self.hv_new = self.hv_inst = {}
5921

    
5922
    # beparams processing
5923
    if self.op.beparams:
5924
      i_bedict = copy.deepcopy(instance.beparams)
5925
      for key, val in self.op.beparams.iteritems():
5926
        if val == constants.VALUE_DEFAULT:
5927
          try:
5928
            del i_bedict[key]
5929
          except KeyError:
5930
            pass
5931
        else:
5932
          i_bedict[key] = val
5933
      cluster = self.cfg.GetClusterInfo()
5934
      utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
5935
      be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
5936
                                i_bedict)
5937
      self.be_new = be_new # the new actual values
5938
      self.be_inst = i_bedict # the new dict (without defaults)
5939
    else:
5940
      self.be_new = self.be_inst = {}
5941

    
5942
    self.warn = []
5943

    
5944
    if constants.BE_MEMORY in self.op.beparams and not self.force:
5945
      mem_check_list = [pnode]
5946
      if be_new[constants.BE_AUTO_BALANCE]:
5947
        # either we changed auto_balance to yes or it was from before
5948
        mem_check_list.extend(instance.secondary_nodes)
5949
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
5950
                                                  instance.hypervisor)
5951
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
5952
                                         instance.hypervisor)
5953
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
5954
        # Assume the primary node is unreachable and go ahead
5955
        self.warn.append("Can't get info from primary node %s" % pnode)
5956
      else:
5957
        if not instance_info.failed and instance_info.data:
5958
          current_mem = int(instance_info.data['memory'])
5959
        else:
5960
          # Assume instance not running
5961
          # (there is a slight race condition here, but it's not very probable,
5962
          # and we have no other way to check)
5963
          current_mem = 0
5964
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
5965
                    nodeinfo[pnode].data['memory_free'])
5966
        if miss_mem > 0:
5967
          raise errors.OpPrereqError("This change will prevent the instance"
5968
                                     " from starting, due to %d MB of memory"
5969
                                     " missing on its primary node" % miss_mem)
5970

    
5971
      if be_new[constants.BE_AUTO_BALANCE]:
5972
        for node, nres in nodeinfo.iteritems():
5973
          if node not in instance.secondary_nodes:
5974
            continue
5975
          if nres.failed or not isinstance(nres.data, dict):
5976
            self.warn.append("Can't get info from secondary node %s" % node)
5977
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
5978
            self.warn.append("Not enough memory to failover instance to"
5979
                             " secondary node %s" % node)
5980

    
5981
    # NIC processing
5982
    for nic_op, nic_dict in self.op.nics:
5983
      if nic_op == constants.DDM_REMOVE:
5984
        if not instance.nics:
5985
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
5986
        continue
5987
      if nic_op != constants.DDM_ADD:
5988
        # an existing nic
5989
        if nic_op < 0 or nic_op >= len(instance.nics):
5990
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
5991
                                     " are 0 to %d" %
5992
                                     (nic_op, len(instance.nics)))
5993
      if 'bridge' in nic_dict:
5994
        nic_bridge = nic_dict['bridge']
5995
        if nic_bridge is None:
5996
          raise errors.OpPrereqError('Cannot set the nic bridge to None')
5997
        if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
5998
          msg = ("Bridge '%s' doesn't exist on one of"
5999
                 " the instance nodes" % nic_bridge)
6000
          if self.force:
6001
            self.warn.append(msg)
6002
          else:
6003
            raise errors.OpPrereqError(msg)
6004
      if 'mac' in nic_dict:
6005
        nic_mac = nic_dict['mac']
6006
        if nic_mac is None:
6007
          raise errors.OpPrereqError('Cannot set the nic mac to None')
6008
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6009
          # otherwise generate the mac
6010
          nic_dict['mac'] = self.cfg.GenerateMAC()
6011
        else:
6012
          # or validate/reserve the current one
6013
          if self.cfg.IsMacInUse(nic_mac):
6014
            raise errors.OpPrereqError("MAC address %s already in use"
6015
                                       " in cluster" % nic_mac)
6016

    
6017
    # DISK processing
6018
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6019
      raise errors.OpPrereqError("Disk operations not supported for"
6020
                                 " diskless instances")
6021
    for disk_op, disk_dict in self.op.disks:
6022
      if disk_op == constants.DDM_REMOVE:
6023
        if len(instance.disks) == 1:
6024
          raise errors.OpPrereqError("Cannot remove the last disk of"
6025
                                     " an instance")
6026
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6027
        ins_l = ins_l[pnode]
6028
        if ins_l.failed or not isinstance(ins_l.data, list):
6029
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6030
        if instance.name in ins_l.data:
6031
          raise errors.OpPrereqError("Instance is running, can't remove"
6032
                                     " disks.")
6033

    
6034
      if (disk_op == constants.DDM_ADD and
6035
          len(instance.nics) >= constants.MAX_DISKS):
6036
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6037
                                   " add more" % constants.MAX_DISKS)
6038
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6039
        # an existing disk
6040
        if disk_op < 0 or disk_op >= len(instance.disks):
6041
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
6042
                                     " are 0 to %d" %
6043
                                     (disk_op, len(instance.disks)))
6044

    
6045
    return
6046

    
6047
  def Exec(self, feedback_fn):
6048
    """Modifies an instance.
6049

6050
    All parameters take effect only at the next restart of the instance.
6051

6052
    """
6053
    # Process here the warnings from CheckPrereq, as we don't have a
6054
    # feedback_fn there.
6055
    for warn in self.warn:
6056
      feedback_fn("WARNING: %s" % warn)
6057

    
6058
    result = []
6059
    instance = self.instance
6060
    # disk changes
6061
    for disk_op, disk_dict in self.op.disks:
6062
      if disk_op == constants.DDM_REMOVE:
6063
        # remove the last disk
6064
        device = instance.disks.pop()
6065
        device_idx = len(instance.disks)
6066
        for node, disk in device.ComputeNodeTree(instance.primary_node):
6067
          self.cfg.SetDiskID(disk, node)
6068
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6069
          if msg:
6070
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
6071
                            " continuing anyway", device_idx, node, msg)
6072
        result.append(("disk/%d" % device_idx, "remove"))
6073
      elif disk_op == constants.DDM_ADD:
6074
        # add a new disk
6075
        if instance.disk_template == constants.DT_FILE:
6076
          file_driver, file_path = instance.disks[0].logical_id
6077
          file_path = os.path.dirname(file_path)
6078
        else:
6079
          file_driver = file_path = None
6080
        disk_idx_base = len(instance.disks)
6081
        new_disk = _GenerateDiskTemplate(self,
6082
                                         instance.disk_template,
6083
                                         instance.name, instance.primary_node,
6084
                                         instance.secondary_nodes,
6085
                                         [disk_dict],
6086
                                         file_path,
6087
                                         file_driver,
6088
                                         disk_idx_base)[0]
6089
        instance.disks.append(new_disk)
6090
        info = _GetInstanceInfoText(instance)
6091

    
6092
        logging.info("Creating volume %s for instance %s",
6093
                     new_disk.iv_name, instance.name)
6094
        # Note: this needs to be kept in sync with _CreateDisks
6095
        #HARDCODE
6096
        for node in instance.all_nodes:
6097
          f_create = node == instance.primary_node
6098
          try:
6099
            _CreateBlockDev(self, node, instance, new_disk,
6100
                            f_create, info, f_create)
6101
          except errors.OpExecError, err:
6102
            self.LogWarning("Failed to create volume %s (%s) on"
6103
                            " node %s: %s",
6104
                            new_disk.iv_name, new_disk, node, err)
6105
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6106
                       (new_disk.size, new_disk.mode)))
6107
      else:
6108
        # change a given disk
6109
        instance.disks[disk_op].mode = disk_dict['mode']
6110
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6111
    # NIC changes
6112
    for nic_op, nic_dict in self.op.nics:
6113
      if nic_op == constants.DDM_REMOVE:
6114
        # remove the last nic
6115
        del instance.nics[-1]
6116
        result.append(("nic.%d" % len(instance.nics), "remove"))
6117
      elif nic_op == constants.DDM_ADD:
6118
        # mac and bridge should be set, by now
6119
        mac = nic_dict['mac']
6120
        bridge = nic_dict['bridge']
6121
        new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6122
                              bridge=bridge)
6123
        instance.nics.append(new_nic)
6124
        result.append(("nic.%d" % (len(instance.nics) - 1),
6125
                       "add:mac=%s,ip=%s,bridge=%s" %
6126
                       (new_nic.mac, new_nic.ip, new_nic.bridge)))
6127
      else:
6128
        # change a given nic
6129
        for key in 'mac', 'ip', 'bridge':
6130
          if key in nic_dict:
6131
            setattr(instance.nics[nic_op], key, nic_dict[key])
6132
            result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6133

    
6134
    # hvparams changes
6135
    if self.op.hvparams:
6136
      instance.hvparams = self.hv_inst
6137
      for key, val in self.op.hvparams.iteritems():
6138
        result.append(("hv/%s" % key, val))
6139

    
6140
    # beparams changes
6141
    if self.op.beparams:
6142
      instance.beparams = self.be_inst
6143
      for key, val in self.op.beparams.iteritems():
6144
        result.append(("be/%s" % key, val))
6145

    
6146
    self.cfg.Update(instance)
6147

    
6148
    return result
6149

    
6150

    
6151
class LUQueryExports(NoHooksLU):
6152
  """Query the exports list
6153

6154
  """
6155
  _OP_REQP = ['nodes']
6156
  REQ_BGL = False
6157

    
6158
  def ExpandNames(self):
6159
    self.needed_locks = {}
6160
    self.share_locks[locking.LEVEL_NODE] = 1
6161
    if not self.op.nodes:
6162
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6163
    else:
6164
      self.needed_locks[locking.LEVEL_NODE] = \
6165
        _GetWantedNodes(self, self.op.nodes)
6166

    
6167
  def CheckPrereq(self):
6168
    """Check prerequisites.
6169

6170
    """
6171
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6172

    
6173
  def Exec(self, feedback_fn):
6174
    """Compute the list of all the exported system images.
6175

6176
    @rtype: dict
6177
    @return: a dictionary with the structure node->(export-list)
6178
        where export-list is a list of the instances exported on
6179
        that node.
6180

6181
    """
6182
    rpcresult = self.rpc.call_export_list(self.nodes)
6183
    result = {}
6184
    for node in rpcresult:
6185
      if rpcresult[node].failed:
6186
        result[node] = False
6187
      else:
6188
        result[node] = rpcresult[node].data
6189

    
6190
    return result
6191

    
6192

    
6193
class LUExportInstance(LogicalUnit):
6194
  """Export an instance to an image in the cluster.
6195

6196
  """
6197
  HPATH = "instance-export"
6198
  HTYPE = constants.HTYPE_INSTANCE
6199
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6200
  REQ_BGL = False
6201

    
6202
  def ExpandNames(self):
6203
    self._ExpandAndLockInstance()
6204
    # FIXME: lock only instance primary and destination node
6205
    #
6206
    # Sad but true, for now we have do lock all nodes, as we don't know where
6207
    # the previous export might be, and and in this LU we search for it and
6208
    # remove it from its current node. In the future we could fix this by:
6209
    #  - making a tasklet to search (share-lock all), then create the new one,
6210
    #    then one to remove, after
6211
    #  - removing the removal operation altoghether
6212
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6213

    
6214
  def DeclareLocks(self, level):
6215
    """Last minute lock declaration."""
6216
    # All nodes are locked anyway, so nothing to do here.
6217

    
6218
  def BuildHooksEnv(self):
6219
    """Build hooks env.
6220

6221
    This will run on the master, primary node and target node.
6222

6223
    """
6224
    env = {
6225
      "EXPORT_NODE": self.op.target_node,
6226
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6227
      }
6228
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6229
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6230
          self.op.target_node]
6231
    return env, nl, nl
6232

    
6233
  def CheckPrereq(self):
6234
    """Check prerequisites.
6235

6236
    This checks that the instance and node names are valid.
6237

6238
    """
6239
    instance_name = self.op.instance_name
6240
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6241
    assert self.instance is not None, \
6242
          "Cannot retrieve locked instance %s" % self.op.instance_name
6243
    _CheckNodeOnline(self, self.instance.primary_node)
6244

    
6245
    self.dst_node = self.cfg.GetNodeInfo(
6246
      self.cfg.ExpandNodeName(self.op.target_node))
6247

    
6248
    if self.dst_node is None:
6249
      # This is wrong node name, not a non-locked node
6250
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6251
    _CheckNodeOnline(self, self.dst_node.name)
6252
    _CheckNodeNotDrained(self, self.dst_node.name)
6253

    
6254
    # instance disk type verification
6255
    for disk in self.instance.disks:
6256
      if disk.dev_type == constants.LD_FILE:
6257
        raise errors.OpPrereqError("Export not supported for instances with"
6258
                                   " file-based disks")
6259

    
6260
  def Exec(self, feedback_fn):
6261
    """Export an instance to an image in the cluster.
6262

6263
    """
6264
    instance = self.instance
6265
    dst_node = self.dst_node
6266
    src_node = instance.primary_node
6267
    if self.op.shutdown:
6268
      # shutdown the instance, but not the disks
6269
      result = self.rpc.call_instance_shutdown(src_node, instance)
6270
      msg = result.RemoteFailMsg()
6271
      if msg:
6272
        raise errors.OpExecError("Could not shutdown instance %s on"
6273
                                 " node %s: %s" %
6274
                                 (instance.name, src_node, msg))
6275

    
6276
    vgname = self.cfg.GetVGName()
6277

    
6278
    snap_disks = []
6279

    
6280
    # set the disks ID correctly since call_instance_start needs the
6281
    # correct drbd minor to create the symlinks
6282
    for disk in instance.disks:
6283
      self.cfg.SetDiskID(disk, src_node)
6284

    
6285
    try:
6286
      for disk in instance.disks:
6287
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6288
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6289
        if new_dev_name.failed or not new_dev_name.data:
6290
          self.LogWarning("Could not snapshot block device %s on node %s",
6291
                          disk.logical_id[1], src_node)
6292
          snap_disks.append(False)
6293
        else:
6294
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6295
                                 logical_id=(vgname, new_dev_name.data),
6296
                                 physical_id=(vgname, new_dev_name.data),
6297
                                 iv_name=disk.iv_name)
6298
          snap_disks.append(new_dev)
6299

    
6300
    finally:
6301
      if self.op.shutdown and instance.admin_up:
6302
        result = self.rpc.call_instance_start(src_node, instance, None, None)
6303
        msg = result.RemoteFailMsg()
6304
        if msg:
6305
          _ShutdownInstanceDisks(self, instance)
6306
          raise errors.OpExecError("Could not start instance: %s" % msg)
6307

    
6308
    # TODO: check for size
6309

    
6310
    cluster_name = self.cfg.GetClusterName()
6311
    for idx, dev in enumerate(snap_disks):
6312
      if dev:
6313
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6314
                                               instance, cluster_name, idx)
6315
        if result.failed or not result.data:
6316
          self.LogWarning("Could not export block device %s from node %s to"
6317
                          " node %s", dev.logical_id[1], src_node,
6318
                          dst_node.name)
6319
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6320
        if msg:
6321
          self.LogWarning("Could not remove snapshot block device %s from node"
6322
                          " %s: %s", dev.logical_id[1], src_node, msg)
6323

    
6324
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6325
    if result.failed or not result.data:
6326
      self.LogWarning("Could not finalize export for instance %s on node %s",
6327
                      instance.name, dst_node.name)
6328

    
6329
    nodelist = self.cfg.GetNodeList()
6330
    nodelist.remove(dst_node.name)
6331

    
6332
    # on one-node clusters nodelist will be empty after the removal
6333
    # if we proceed the backup would be removed because OpQueryExports
6334
    # substitutes an empty list with the full cluster node list.
6335
    if nodelist:
6336
      exportlist = self.rpc.call_export_list(nodelist)
6337
      for node in exportlist:
6338
        if exportlist[node].failed:
6339
          continue
6340
        if instance.name in exportlist[node].data:
6341
          if not self.rpc.call_export_remove(node, instance.name):
6342
            self.LogWarning("Could not remove older export for instance %s"
6343
                            " on node %s", instance.name, node)
6344

    
6345

    
6346
class LURemoveExport(NoHooksLU):
6347
  """Remove exports related to the named instance.
6348

6349
  """
6350
  _OP_REQP = ["instance_name"]
6351
  REQ_BGL = False
6352

    
6353
  def ExpandNames(self):
6354
    self.needed_locks = {}
6355
    # We need all nodes to be locked in order for RemoveExport to work, but we
6356
    # don't need to lock the instance itself, as nothing will happen to it (and
6357
    # we can remove exports also for a removed instance)
6358
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6359

    
6360
  def CheckPrereq(self):
6361
    """Check prerequisites.
6362
    """
6363
    pass
6364

    
6365
  def Exec(self, feedback_fn):
6366
    """Remove any export.
6367

6368
    """
6369
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6370
    # If the instance was not found we'll try with the name that was passed in.
6371
    # This will only work if it was an FQDN, though.
6372
    fqdn_warn = False
6373
    if not instance_name:
6374
      fqdn_warn = True
6375
      instance_name = self.op.instance_name
6376

    
6377
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6378
      locking.LEVEL_NODE])
6379
    found = False
6380
    for node in exportlist:
6381
      if exportlist[node].failed:
6382
        self.LogWarning("Failed to query node %s, continuing" % node)
6383
        continue
6384
      if instance_name in exportlist[node].data:
6385
        found = True
6386
        result = self.rpc.call_export_remove(node, instance_name)
6387
        if result.failed or not result.data:
6388
          logging.error("Could not remove export for instance %s"
6389
                        " on node %s", instance_name, node)
6390

    
6391
    if fqdn_warn and not found:
6392
      feedback_fn("Export not found. If trying to remove an export belonging"
6393
                  " to a deleted instance please use its Fully Qualified"
6394
                  " Domain Name.")
6395

    
6396

    
6397
class TagsLU(NoHooksLU):
6398
  """Generic tags LU.
6399

6400
  This is an abstract class which is the parent of all the other tags LUs.
6401

6402
  """
6403

    
6404
  def ExpandNames(self):
6405
    self.needed_locks = {}
6406
    if self.op.kind == constants.TAG_NODE:
6407
      name = self.cfg.ExpandNodeName(self.op.name)
6408
      if name is None:
6409
        raise errors.OpPrereqError("Invalid node name (%s)" %
6410
                                   (self.op.name,))
6411
      self.op.name = name
6412
      self.needed_locks[locking.LEVEL_NODE] = name
6413
    elif self.op.kind == constants.TAG_INSTANCE:
6414
      name = self.cfg.ExpandInstanceName(self.op.name)
6415
      if name is None:
6416
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6417
                                   (self.op.name,))
6418
      self.op.name = name
6419
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6420

    
6421
  def CheckPrereq(self):
6422
    """Check prerequisites.
6423

6424
    """
6425
    if self.op.kind == constants.TAG_CLUSTER:
6426
      self.target = self.cfg.GetClusterInfo()
6427
    elif self.op.kind == constants.TAG_NODE:
6428
      self.target = self.cfg.GetNodeInfo(self.op.name)
6429
    elif self.op.kind == constants.TAG_INSTANCE:
6430
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6431
    else:
6432
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6433
                                 str(self.op.kind))
6434

    
6435

    
6436
class LUGetTags(TagsLU):
6437
  """Returns the tags of a given object.
6438

6439
  """
6440
  _OP_REQP = ["kind", "name"]
6441
  REQ_BGL = False
6442

    
6443
  def Exec(self, feedback_fn):
6444
    """Returns the tag list.
6445

6446
    """
6447
    return list(self.target.GetTags())
6448

    
6449

    
6450
class LUSearchTags(NoHooksLU):
6451
  """Searches the tags for a given pattern.
6452

6453
  """
6454
  _OP_REQP = ["pattern"]
6455
  REQ_BGL = False
6456

    
6457
  def ExpandNames(self):
6458
    self.needed_locks = {}
6459

    
6460
  def CheckPrereq(self):
6461
    """Check prerequisites.
6462

6463
    This checks the pattern passed for validity by compiling it.
6464

6465
    """
6466
    try:
6467
      self.re = re.compile(self.op.pattern)
6468
    except re.error, err:
6469
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6470
                                 (self.op.pattern, err))
6471

    
6472
  def Exec(self, feedback_fn):
6473
    """Returns the tag list.
6474

6475
    """
6476
    cfg = self.cfg
6477
    tgts = [("/cluster", cfg.GetClusterInfo())]
6478
    ilist = cfg.GetAllInstancesInfo().values()
6479
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6480
    nlist = cfg.GetAllNodesInfo().values()
6481
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6482
    results = []
6483
    for path, target in tgts:
6484
      for tag in target.GetTags():
6485
        if self.re.search(tag):
6486
          results.append((path, tag))
6487
    return results
6488

    
6489

    
6490
class LUAddTags(TagsLU):
6491
  """Sets a tag on a given object.
6492

6493
  """
6494
  _OP_REQP = ["kind", "name", "tags"]
6495
  REQ_BGL = False
6496

    
6497
  def CheckPrereq(self):
6498
    """Check prerequisites.
6499

6500
    This checks the type and length of the tag name and value.
6501

6502
    """
6503
    TagsLU.CheckPrereq(self)
6504
    for tag in self.op.tags:
6505
      objects.TaggableObject.ValidateTag(tag)
6506

    
6507
  def Exec(self, feedback_fn):
6508
    """Sets the tag.
6509

6510
    """
6511
    try:
6512
      for tag in self.op.tags:
6513
        self.target.AddTag(tag)
6514
    except errors.TagError, err:
6515
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6516
    try:
6517
      self.cfg.Update(self.target)
6518
    except errors.ConfigurationError:
6519
      raise errors.OpRetryError("There has been a modification to the"
6520
                                " config file and the operation has been"
6521
                                " aborted. Please retry.")
6522

    
6523

    
6524
class LUDelTags(TagsLU):
6525
  """Delete a list of tags from a given object.
6526

6527
  """
6528
  _OP_REQP = ["kind", "name", "tags"]
6529
  REQ_BGL = False
6530

    
6531
  def CheckPrereq(self):
6532
    """Check prerequisites.
6533

6534
    This checks that we have the given tag.
6535

6536
    """
6537
    TagsLU.CheckPrereq(self)
6538
    for tag in self.op.tags:
6539
      objects.TaggableObject.ValidateTag(tag)
6540
    del_tags = frozenset(self.op.tags)
6541
    cur_tags = self.target.GetTags()
6542
    if not del_tags <= cur_tags:
6543
      diff_tags = del_tags - cur_tags
6544
      diff_names = ["'%s'" % tag for tag in diff_tags]
6545
      diff_names.sort()
6546
      raise errors.OpPrereqError("Tag(s) %s not found" %
6547
                                 (",".join(diff_names)))
6548

    
6549
  def Exec(self, feedback_fn):
6550
    """Remove the tag from the object.
6551

6552
    """
6553
    for tag in self.op.tags:
6554
      self.target.RemoveTag(tag)
6555
    try:
6556
      self.cfg.Update(self.target)
6557
    except errors.ConfigurationError:
6558
      raise errors.OpRetryError("There has been a modification to the"
6559
                                " config file and the operation has been"
6560
                                " aborted. Please retry.")
6561

    
6562

    
6563
class LUTestDelay(NoHooksLU):
6564
  """Sleep for a specified amount of time.
6565

6566
  This LU sleeps on the master and/or nodes for a specified amount of
6567
  time.
6568

6569
  """
6570
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6571
  REQ_BGL = False
6572

    
6573
  def ExpandNames(self):
6574
    """Expand names and set required locks.
6575

6576
    This expands the node list, if any.
6577

6578
    """
6579
    self.needed_locks = {}
6580
    if self.op.on_nodes:
6581
      # _GetWantedNodes can be used here, but is not always appropriate to use
6582
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6583
      # more information.
6584
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6585
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6586

    
6587
  def CheckPrereq(self):
6588
    """Check prerequisites.
6589

6590
    """
6591

    
6592
  def Exec(self, feedback_fn):
6593
    """Do the actual sleep.
6594

6595
    """
6596
    if self.op.on_master:
6597
      if not utils.TestDelay(self.op.duration):
6598
        raise errors.OpExecError("Error during master delay test")
6599
    if self.op.on_nodes:
6600
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6601
      if not result:
6602
        raise errors.OpExecError("Complete failure from rpc call")
6603
      for node, node_result in result.items():
6604
        node_result.Raise()
6605
        if not node_result.data:
6606
          raise errors.OpExecError("Failure during rpc call to node %s,"
6607
                                   " result: %s" % (node, node_result.data))
6608

    
6609

    
6610
class IAllocator(object):
6611
  """IAllocator framework.
6612

6613
  An IAllocator instance has three sets of attributes:
6614
    - cfg that is needed to query the cluster
6615
    - input data (all members of the _KEYS class attribute are required)
6616
    - four buffer attributes (in|out_data|text), that represent the
6617
      input (to the external script) in text and data structure format,
6618
      and the output from it, again in two formats
6619
    - the result variables from the script (success, info, nodes) for
6620
      easy usage
6621

6622
  """
6623
  _ALLO_KEYS = [
6624
    "mem_size", "disks", "disk_template",
6625
    "os", "tags", "nics", "vcpus", "hypervisor",
6626
    ]
6627
  _RELO_KEYS = [
6628
    "relocate_from",
6629
    ]
6630

    
6631
  def __init__(self, lu, mode, name, **kwargs):
6632
    self.lu = lu
6633
    # init buffer variables
6634
    self.in_text = self.out_text = self.in_data = self.out_data = None
6635
    # init all input fields so that pylint is happy
6636
    self.mode = mode
6637
    self.name = name
6638
    self.mem_size = self.disks = self.disk_template = None
6639
    self.os = self.tags = self.nics = self.vcpus = None
6640
    self.hypervisor = None
6641
    self.relocate_from = None
6642
    # computed fields
6643
    self.required_nodes = None
6644
    # init result fields
6645
    self.success = self.info = self.nodes = None
6646
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6647
      keyset = self._ALLO_KEYS
6648
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6649
      keyset = self._RELO_KEYS
6650
    else:
6651
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6652
                                   " IAllocator" % self.mode)
6653
    for key in kwargs:
6654
      if key not in keyset:
6655
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6656
                                     " IAllocator" % key)
6657
      setattr(self, key, kwargs[key])
6658
    for key in keyset:
6659
      if key not in kwargs:
6660
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6661
                                     " IAllocator" % key)
6662
    self._BuildInputData()
6663

    
6664
  def _ComputeClusterData(self):
6665
    """Compute the generic allocator input data.
6666

6667
    This is the data that is independent of the actual operation.
6668

6669
    """
6670
    cfg = self.lu.cfg
6671
    cluster_info = cfg.GetClusterInfo()
6672
    # cluster data
6673
    data = {
6674
      "version": constants.IALLOCATOR_VERSION,
6675
      "cluster_name": cfg.GetClusterName(),
6676
      "cluster_tags": list(cluster_info.GetTags()),
6677
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6678
      # we don't have job IDs
6679
      }
6680
    iinfo = cfg.GetAllInstancesInfo().values()
6681
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6682

    
6683
    # node data
6684
    node_results = {}
6685
    node_list = cfg.GetNodeList()
6686

    
6687
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6688
      hypervisor_name = self.hypervisor
6689
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6690
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6691

    
6692
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6693
                                           hypervisor_name)
6694
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6695
                       cluster_info.enabled_hypervisors)
6696
    for nname, nresult in node_data.items():
6697
      # first fill in static (config-based) values
6698
      ninfo = cfg.GetNodeInfo(nname)
6699
      pnr = {
6700
        "tags": list(ninfo.GetTags()),
6701
        "primary_ip": ninfo.primary_ip,
6702
        "secondary_ip": ninfo.secondary_ip,
6703
        "offline": ninfo.offline,
6704
        "drained": ninfo.drained,
6705
        "master_candidate": ninfo.master_candidate,
6706
        }
6707

    
6708
      if not ninfo.offline:
6709
        nresult.Raise()
6710
        if not isinstance(nresult.data, dict):
6711
          raise errors.OpExecError("Can't get data for node %s" % nname)
6712
        remote_info = nresult.data
6713
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6714
                     'vg_size', 'vg_free', 'cpu_total']:
6715
          if attr not in remote_info:
6716
            raise errors.OpExecError("Node '%s' didn't return attribute"
6717
                                     " '%s'" % (nname, attr))
6718
          try:
6719
            remote_info[attr] = int(remote_info[attr])
6720
          except ValueError, err:
6721
            raise errors.OpExecError("Node '%s' returned invalid value"
6722
                                     " for '%s': %s" % (nname, attr, err))
6723
        # compute memory used by primary instances
6724
        i_p_mem = i_p_up_mem = 0
6725
        for iinfo, beinfo in i_list:
6726
          if iinfo.primary_node == nname:
6727
            i_p_mem += beinfo[constants.BE_MEMORY]
6728
            if iinfo.name not in node_iinfo[nname].data:
6729
              i_used_mem = 0
6730
            else:
6731
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6732
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6733
            remote_info['memory_free'] -= max(0, i_mem_diff)
6734

    
6735
            if iinfo.admin_up:
6736
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6737

    
6738
        # compute memory used by instances
6739
        pnr_dyn = {
6740
          "total_memory": remote_info['memory_total'],
6741
          "reserved_memory": remote_info['memory_dom0'],
6742
          "free_memory": remote_info['memory_free'],
6743
          "total_disk": remote_info['vg_size'],
6744
          "free_disk": remote_info['vg_free'],
6745
          "total_cpus": remote_info['cpu_total'],
6746
          "i_pri_memory": i_p_mem,
6747
          "i_pri_up_memory": i_p_up_mem,
6748
          }
6749
        pnr.update(pnr_dyn)
6750

    
6751
      node_results[nname] = pnr
6752
    data["nodes"] = node_results
6753

    
6754
    # instance data
6755
    instance_data = {}
6756
    for iinfo, beinfo in i_list:
6757
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6758
                  for n in iinfo.nics]
6759
      pir = {
6760
        "tags": list(iinfo.GetTags()),
6761
        "admin_up": iinfo.admin_up,
6762
        "vcpus": beinfo[constants.BE_VCPUS],
6763
        "memory": beinfo[constants.BE_MEMORY],
6764
        "os": iinfo.os,
6765
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6766
        "nics": nic_data,
6767
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6768
        "disk_template": iinfo.disk_template,
6769
        "hypervisor": iinfo.hypervisor,
6770
        }
6771
      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6772
                                                 pir["disks"])
6773
      instance_data[iinfo.name] = pir
6774

    
6775
    data["instances"] = instance_data
6776

    
6777
    self.in_data = data
6778

    
6779
  def _AddNewInstance(self):
6780
    """Add new instance data to allocator structure.
6781

6782
    This in combination with _AllocatorGetClusterData will create the
6783
    correct structure needed as input for the allocator.
6784

6785
    The checks for the completeness of the opcode must have already been
6786
    done.
6787

6788
    """
6789
    data = self.in_data
6790

    
6791
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6792

    
6793
    if self.disk_template in constants.DTS_NET_MIRROR:
6794
      self.required_nodes = 2
6795
    else:
6796
      self.required_nodes = 1
6797
    request = {
6798
      "type": "allocate",
6799
      "name": self.name,
6800
      "disk_template": self.disk_template,
6801
      "tags": self.tags,
6802
      "os": self.os,
6803
      "vcpus": self.vcpus,
6804
      "memory": self.mem_size,
6805
      "disks": self.disks,
6806
      "disk_space_total": disk_space,
6807
      "nics": self.nics,
6808
      "required_nodes": self.required_nodes,
6809
      }
6810
    data["request"] = request
6811

    
6812
  def _AddRelocateInstance(self):
6813
    """Add relocate instance data to allocator structure.
6814

6815
    This in combination with _IAllocatorGetClusterData will create the
6816
    correct structure needed as input for the allocator.
6817

6818
    The checks for the completeness of the opcode must have already been
6819
    done.
6820

6821
    """
6822
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6823
    if instance is None:
6824
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6825
                                   " IAllocator" % self.name)
6826

    
6827
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6828
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6829

    
6830
    if len(instance.secondary_nodes) != 1:
6831
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6832

    
6833
    self.required_nodes = 1
6834
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6835
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6836

    
6837
    request = {
6838
      "type": "relocate",
6839
      "name": self.name,
6840
      "disk_space_total": disk_space,
6841
      "required_nodes": self.required_nodes,
6842
      "relocate_from": self.relocate_from,
6843
      }
6844
    self.in_data["request"] = request
6845

    
6846
  def _BuildInputData(self):
6847
    """Build input data structures.
6848

6849
    """
6850
    self._ComputeClusterData()
6851

    
6852
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6853
      self._AddNewInstance()
6854
    else:
6855
      self._AddRelocateInstance()
6856

    
6857
    self.in_text = serializer.Dump(self.in_data)
6858

    
6859
  def Run(self, name, validate=True, call_fn=None):
6860
    """Run an instance allocator and return the results.
6861

6862
    """
6863
    if call_fn is None:
6864
      call_fn = self.lu.rpc.call_iallocator_runner
6865
    data = self.in_text
6866

    
6867
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6868
    result.Raise()
6869

    
6870
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6871
      raise errors.OpExecError("Invalid result from master iallocator runner")
6872

    
6873
    rcode, stdout, stderr, fail = result.data
6874

    
6875
    if rcode == constants.IARUN_NOTFOUND:
6876
      raise errors.OpExecError("Can't find allocator '%s'" % name)
6877
    elif rcode == constants.IARUN_FAILURE:
6878
      raise errors.OpExecError("Instance allocator call failed: %s,"
6879
                               " output: %s" % (fail, stdout+stderr))
6880
    self.out_text = stdout
6881
    if validate:
6882
      self._ValidateResult()
6883

    
6884
  def _ValidateResult(self):
6885
    """Process the allocator results.
6886

6887
    This will process and if successful save the result in
6888
    self.out_data and the other parameters.
6889

6890
    """
6891
    try:
6892
      rdict = serializer.Load(self.out_text)
6893
    except Exception, err:
6894
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6895

    
6896
    if not isinstance(rdict, dict):
6897
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
6898

    
6899
    for key in "success", "info", "nodes":
6900
      if key not in rdict:
6901
        raise errors.OpExecError("Can't parse iallocator results:"
6902
                                 " missing key '%s'" % key)
6903
      setattr(self, key, rdict[key])
6904

    
6905
    if not isinstance(rdict["nodes"], list):
6906
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6907
                               " is not a list")
6908
    self.out_data = rdict
6909

    
6910

    
6911
class LUTestAllocator(NoHooksLU):
6912
  """Run allocator tests.
6913

6914
  This LU runs the allocator tests
6915

6916
  """
6917
  _OP_REQP = ["direction", "mode", "name"]
6918

    
6919
  def CheckPrereq(self):
6920
    """Check prerequisites.
6921

6922
    This checks the opcode parameters depending on the director and mode test.
6923

6924
    """
6925
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6926
      for attr in ["name", "mem_size", "disks", "disk_template",
6927
                   "os", "tags", "nics", "vcpus"]:
6928
        if not hasattr(self.op, attr):
6929
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
6930
                                     attr)
6931
      iname = self.cfg.ExpandInstanceName(self.op.name)
6932
      if iname is not None:
6933
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
6934
                                   iname)
6935
      if not isinstance(self.op.nics, list):
6936
        raise errors.OpPrereqError("Invalid parameter 'nics'")
6937
      for row in self.op.nics:
6938
        if (not isinstance(row, dict) or
6939
            "mac" not in row or
6940
            "ip" not in row or
6941
            "bridge" not in row):
6942
          raise errors.OpPrereqError("Invalid contents of the"
6943
                                     " 'nics' parameter")
6944
      if not isinstance(self.op.disks, list):
6945
        raise errors.OpPrereqError("Invalid parameter 'disks'")
6946
      for row in self.op.disks:
6947
        if (not isinstance(row, dict) or
6948
            "size" not in row or
6949
            not isinstance(row["size"], int) or
6950
            "mode" not in row or
6951
            row["mode"] not in ['r', 'w']):
6952
          raise errors.OpPrereqError("Invalid contents of the"
6953
                                     " 'disks' parameter")
6954
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
6955
        self.op.hypervisor = self.cfg.GetHypervisorType()
6956
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
6957
      if not hasattr(self.op, "name"):
6958
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
6959
      fname = self.cfg.ExpandInstanceName(self.op.name)
6960
      if fname is None:
6961
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
6962
                                   self.op.name)
6963
      self.op.name = fname
6964
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
6965
    else:
6966
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
6967
                                 self.op.mode)
6968

    
6969
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
6970
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
6971
        raise errors.OpPrereqError("Missing allocator name")
6972
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
6973
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
6974
                                 self.op.direction)
6975

    
6976
  def Exec(self, feedback_fn):
6977
    """Run the allocator test.
6978

6979
    """
6980
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
6981
      ial = IAllocator(self,
6982
                       mode=self.op.mode,
6983
                       name=self.op.name,
6984
                       mem_size=self.op.mem_size,
6985
                       disks=self.op.disks,
6986
                       disk_template=self.op.disk_template,
6987
                       os=self.op.os,
6988
                       tags=self.op.tags,
6989
                       nics=self.op.nics,
6990
                       vcpus=self.op.vcpus,
6991
                       hypervisor=self.op.hypervisor,
6992
                       )
6993
    else:
6994
      ial = IAllocator(self,
6995
                       mode=self.op.mode,
6996
                       name=self.op.name,
6997
                       relocate_from=list(self.relocate_from),
6998
                       )
6999

    
7000
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
7001
      result = ial.in_text
7002
    else:
7003
      ial.Run(self.op.allocator, validate=False)
7004
      result = ial.out_text
7005
    return result