Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 4c4e4e1e

History | View | Annotate | Download (247.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import tempfile
30
import re
31
import platform
32
import logging
33
import copy
34
import random
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395
  return wanted
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _CheckBooleanOpField(op, name):
418
  """Validates boolean opcode parameters.
419

420
  This will ensure that an opcode parameter is either a boolean value,
421
  or None (but that it always exists).
422

423
  """
424
  val = getattr(op, name, None)
425
  if not (val is None or isinstance(val, bool)):
426
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427
                               (name, str(val)))
428
  setattr(op, name, val)
429

    
430

    
431
def _CheckNodeOnline(lu, node):
432
  """Ensure that a given node is online.
433

434
  @param lu: the LU on behalf of which we make the check
435
  @param node: the node to check
436
  @raise errors.OpPrereqError: if the node is offline
437

438
  """
439
  if lu.cfg.GetNodeInfo(node).offline:
440
    raise errors.OpPrereqError("Can't use offline node %s" % node)
441

    
442

    
443
def _CheckNodeNotDrained(lu, node):
444
  """Ensure that a given node is not drained.
445

446
  @param lu: the LU on behalf of which we make the check
447
  @param node: the node to check
448
  @raise errors.OpPrereqError: if the node is drained
449

450
  """
451
  if lu.cfg.GetNodeInfo(node).drained:
452
    raise errors.OpPrereqError("Can't use drained node %s" % node)
453

    
454

    
455
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456
                          memory, vcpus, nics, disk_template, disks):
457
  """Builds instance related env variables for hooks
458

459
  This builds the hook environment from individual variables.
460

461
  @type name: string
462
  @param name: the name of the instance
463
  @type primary_node: string
464
  @param primary_node: the name of the instance's primary node
465
  @type secondary_nodes: list
466
  @param secondary_nodes: list of secondary nodes as strings
467
  @type os_type: string
468
  @param os_type: the name of the instance's OS
469
  @type status: boolean
470
  @param status: the should_run status of the instance
471
  @type memory: string
472
  @param memory: the memory size of the instance
473
  @type vcpus: string
474
  @param vcpus: the count of VCPUs the instance has
475
  @type nics: list
476
  @param nics: list of tuples (ip, bridge, mac) representing
477
      the NICs the instance  has
478
  @type disk_template: string
479
  @param disk_template: the distk template of the instance
480
  @type disks: list
481
  @param disks: the list of (size, mode) pairs
482
  @rtype: dict
483
  @return: the hook environment for this instance
484

485
  """
486
  if status:
487
    str_status = "up"
488
  else:
489
    str_status = "down"
490
  env = {
491
    "OP_TARGET": name,
492
    "INSTANCE_NAME": name,
493
    "INSTANCE_PRIMARY": primary_node,
494
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495
    "INSTANCE_OS_TYPE": os_type,
496
    "INSTANCE_STATUS": str_status,
497
    "INSTANCE_MEMORY": memory,
498
    "INSTANCE_VCPUS": vcpus,
499
    "INSTANCE_DISK_TEMPLATE": disk_template,
500
  }
501

    
502
  if nics:
503
    nic_count = len(nics)
504
    for idx, (ip, mac, mode, link) in enumerate(nics):
505
      if ip is None:
506
        ip = ""
507
      env["INSTANCE_NIC%d_IP" % idx] = ip
508
      env["INSTANCE_NIC%d_MAC" % idx] = mac
509
      env["INSTANCE_NIC%d_MODE" % idx] = mode
510
      env["INSTANCE_NIC%d_LINK" % idx] = link
511
      if mode == constants.NIC_MODE_BRIDGED:
512
        env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513
  else:
514
    nic_count = 0
515

    
516
  env["INSTANCE_NIC_COUNT"] = nic_count
517

    
518
  if disks:
519
    disk_count = len(disks)
520
    for idx, (size, mode) in enumerate(disks):
521
      env["INSTANCE_DISK%d_SIZE" % idx] = size
522
      env["INSTANCE_DISK%d_MODE" % idx] = mode
523
  else:
524
    disk_count = 0
525

    
526
  env["INSTANCE_DISK_COUNT"] = disk_count
527

    
528
  return env
529

    
530
def _PreBuildNICHooksList(lu, nics):
531
  """Build a list of nic information tuples.
532

533
  This list is suitable to be passed to _BuildInstanceHookEnv.
534

535
  @type lu:  L{LogicalUnit}
536
  @param lu: the logical unit on whose behalf we execute
537
  @type nics: list of L{objects.NIC}
538
  @param nics: list of nics to convert to hooks tuples
539

540
  """
541
  hooks_nics = []
542
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543
  for nic in nics:
544
    ip = nic.ip
545
    mac = nic.mac
546
    filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547
    mode = filled_params[constants.NIC_MODE]
548
    link = filled_params[constants.NIC_LINK]
549
    hooks_nics.append((ip, mac, mode, link))
550
  return hooks_nics
551

    
552
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553
  """Builds instance related env variables for hooks from an object.
554

555
  @type lu: L{LogicalUnit}
556
  @param lu: the logical unit on whose behalf we execute
557
  @type instance: L{objects.Instance}
558
  @param instance: the instance for which we should build the
559
      environment
560
  @type override: dict
561
  @param override: dictionary with key/values that will override
562
      our values
563
  @rtype: dict
564
  @return: the hook environment dictionary
565

566
  """
567
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
568
  args = {
569
    'name': instance.name,
570
    'primary_node': instance.primary_node,
571
    'secondary_nodes': instance.secondary_nodes,
572
    'os_type': instance.os,
573
    'status': instance.admin_up,
574
    'memory': bep[constants.BE_MEMORY],
575
    'vcpus': bep[constants.BE_VCPUS],
576
    'nics': _PreBuildNICHooksList(lu, instance.nics),
577
    'disk_template': instance.disk_template,
578
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
579
  }
580
  if override:
581
    args.update(override)
582
  return _BuildInstanceHookEnv(**args)
583

    
584

    
585
def _AdjustCandidatePool(lu):
586
  """Adjust the candidate pool after node operations.
587

588
  """
589
  mod_list = lu.cfg.MaintainCandidatePool()
590
  if mod_list:
591
    lu.LogInfo("Promoted nodes to master candidate role: %s",
592
               ", ".join(node.name for node in mod_list))
593
    for name in mod_list:
594
      lu.context.ReaddNode(name)
595
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596
  if mc_now > mc_max:
597
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598
               (mc_now, mc_max))
599

    
600

    
601
def _CheckNicsBridgesExist(lu, target_nics, target_node,
602
                               profile=constants.PP_DEFAULT):
603
  """Check that the brigdes needed by a list of nics exist.
604

605
  """
606
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608
                for nic in target_nics]
609
  brlist = [params[constants.NIC_LINK] for params in paramslist
610
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611
  if brlist:
612
    result = lu.rpc.call_bridges_exist(target_node, brlist)
613
    result.Raise("Error checking bridges on destination node '%s'" %
614
                 target_node, prereq=True)
615

    
616

    
617
def _CheckInstanceBridgesExist(lu, instance, node=None):
618
  """Check that the brigdes needed by an instance exist.
619

620
  """
621
  if node is None:
622
    node=instance.primary_node
623
  _CheckNicsBridgesExist(lu, instance.nics, node)
624

    
625

    
626
class LUDestroyCluster(NoHooksLU):
627
  """Logical unit for destroying the cluster.
628

629
  """
630
  _OP_REQP = []
631

    
632
  def CheckPrereq(self):
633
    """Check prerequisites.
634

635
    This checks whether the cluster is empty.
636

637
    Any errors are signalled by raising errors.OpPrereqError.
638

639
    """
640
    master = self.cfg.GetMasterNode()
641

    
642
    nodelist = self.cfg.GetNodeList()
643
    if len(nodelist) != 1 or nodelist[0] != master:
644
      raise errors.OpPrereqError("There are still %d node(s) in"
645
                                 " this cluster." % (len(nodelist) - 1))
646
    instancelist = self.cfg.GetInstanceList()
647
    if instancelist:
648
      raise errors.OpPrereqError("There are still %d instance(s) in"
649
                                 " this cluster." % len(instancelist))
650

    
651
  def Exec(self, feedback_fn):
652
    """Destroys the cluster.
653

654
    """
655
    master = self.cfg.GetMasterNode()
656
    result = self.rpc.call_node_stop_master(master, False)
657
    result.Raise("Could not disable the master role")
658
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
659
    utils.CreateBackup(priv_key)
660
    utils.CreateBackup(pub_key)
661
    return master
662

    
663

    
664
class LUVerifyCluster(LogicalUnit):
665
  """Verifies the cluster status.
666

667
  """
668
  HPATH = "cluster-verify"
669
  HTYPE = constants.HTYPE_CLUSTER
670
  _OP_REQP = ["skip_checks"]
671
  REQ_BGL = False
672

    
673
  def ExpandNames(self):
674
    self.needed_locks = {
675
      locking.LEVEL_NODE: locking.ALL_SET,
676
      locking.LEVEL_INSTANCE: locking.ALL_SET,
677
    }
678
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
679

    
680
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
681
                  node_result, feedback_fn, master_files,
682
                  drbd_map, vg_name):
683
    """Run multiple tests against a node.
684

685
    Test list:
686

687
      - compares ganeti version
688
      - checks vg existance and size > 20G
689
      - checks config file checksum
690
      - checks ssh to other nodes
691

692
    @type nodeinfo: L{objects.Node}
693
    @param nodeinfo: the node to check
694
    @param file_list: required list of files
695
    @param local_cksum: dictionary of local files and their checksums
696
    @param node_result: the results from the node
697
    @param feedback_fn: function used to accumulate results
698
    @param master_files: list of files that only masters should have
699
    @param drbd_map: the useddrbd minors for this node, in
700
        form of minor: (instance, must_exist) which correspond to instances
701
        and their running status
702
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
703

704
    """
705
    node = nodeinfo.name
706

    
707
    # main result, node_result should be a non-empty dict
708
    if not node_result or not isinstance(node_result, dict):
709
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
710
      return True
711

    
712
    # compares ganeti version
713
    local_version = constants.PROTOCOL_VERSION
714
    remote_version = node_result.get('version', None)
715
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
716
            len(remote_version) == 2):
717
      feedback_fn("  - ERROR: connection to %s failed" % (node))
718
      return True
719

    
720
    if local_version != remote_version[0]:
721
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
722
                  " node %s %s" % (local_version, node, remote_version[0]))
723
      return True
724

    
725
    # node seems compatible, we can actually try to look into its results
726

    
727
    bad = False
728

    
729
    # full package version
730
    if constants.RELEASE_VERSION != remote_version[1]:
731
      feedback_fn("  - WARNING: software version mismatch: master %s,"
732
                  " node %s %s" %
733
                  (constants.RELEASE_VERSION, node, remote_version[1]))
734

    
735
    # checks vg existence and size > 20G
736
    if vg_name is not None:
737
      vglist = node_result.get(constants.NV_VGLIST, None)
738
      if not vglist:
739
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
740
                        (node,))
741
        bad = True
742
      else:
743
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
744
                                              constants.MIN_VG_SIZE)
745
        if vgstatus:
746
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747
          bad = True
748

    
749
    # checks config file checksum
750

    
751
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
752
    if not isinstance(remote_cksum, dict):
753
      bad = True
754
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
755
    else:
756
      for file_name in file_list:
757
        node_is_mc = nodeinfo.master_candidate
758
        must_have_file = file_name not in master_files
759
        if file_name not in remote_cksum:
760
          if node_is_mc or must_have_file:
761
            bad = True
762
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
763
        elif remote_cksum[file_name] != local_cksum[file_name]:
764
          if node_is_mc or must_have_file:
765
            bad = True
766
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
767
          else:
768
            # not candidate and this is not a must-have file
769
            bad = True
770
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
771
                        " '%s'" % file_name)
772
        else:
773
          # all good, except non-master/non-must have combination
774
          if not node_is_mc and not must_have_file:
775
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
776
                        " candidates" % file_name)
777

    
778
    # checks ssh to any
779

    
780
    if constants.NV_NODELIST not in node_result:
781
      bad = True
782
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
783
    else:
784
      if node_result[constants.NV_NODELIST]:
785
        bad = True
786
        for node in node_result[constants.NV_NODELIST]:
787
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
788
                          (node, node_result[constants.NV_NODELIST][node]))
789

    
790
    if constants.NV_NODENETTEST not in node_result:
791
      bad = True
792
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
793
    else:
794
      if node_result[constants.NV_NODENETTEST]:
795
        bad = True
796
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
797
        for node in nlist:
798
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
799
                          (node, node_result[constants.NV_NODENETTEST][node]))
800

    
801
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
802
    if isinstance(hyp_result, dict):
803
      for hv_name, hv_result in hyp_result.iteritems():
804
        if hv_result is not None:
805
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
806
                      (hv_name, hv_result))
807

    
808
    # check used drbd list
809
    if vg_name is not None:
810
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
811
      if not isinstance(used_minors, (tuple, list)):
812
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
813
                    str(used_minors))
814
      else:
815
        for minor, (iname, must_exist) in drbd_map.items():
816
          if minor not in used_minors and must_exist:
817
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
818
                        " not active" % (minor, iname))
819
            bad = True
820
        for minor in used_minors:
821
          if minor not in drbd_map:
822
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
823
                        minor)
824
            bad = True
825

    
826
    return bad
827

    
828
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
829
                      node_instance, feedback_fn, n_offline):
830
    """Verify an instance.
831

832
    This function checks to see if the required block devices are
833
    available on the instance's node.
834

835
    """
836
    bad = False
837

    
838
    node_current = instanceconfig.primary_node
839

    
840
    node_vol_should = {}
841
    instanceconfig.MapLVsByNode(node_vol_should)
842

    
843
    for node in node_vol_should:
844
      if node in n_offline:
845
        # ignore missing volumes on offline nodes
846
        continue
847
      for volume in node_vol_should[node]:
848
        if node not in node_vol_is or volume not in node_vol_is[node]:
849
          feedback_fn("  - ERROR: volume %s missing on node %s" %
850
                          (volume, node))
851
          bad = True
852

    
853
    if instanceconfig.admin_up:
854
      if ((node_current not in node_instance or
855
          not instance in node_instance[node_current]) and
856
          node_current not in n_offline):
857
        feedback_fn("  - ERROR: instance %s not running on node %s" %
858
                        (instance, node_current))
859
        bad = True
860

    
861
    for node in node_instance:
862
      if (not node == node_current):
863
        if instance in node_instance[node]:
864
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
865
                          (instance, node))
866
          bad = True
867

    
868
    return bad
869

    
870
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
871
    """Verify if there are any unknown volumes in the cluster.
872

873
    The .os, .swap and backup volumes are ignored. All other volumes are
874
    reported as unknown.
875

876
    """
877
    bad = False
878

    
879
    for node in node_vol_is:
880
      for volume in node_vol_is[node]:
881
        if node not in node_vol_should or volume not in node_vol_should[node]:
882
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
883
                      (volume, node))
884
          bad = True
885
    return bad
886

    
887
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
888
    """Verify the list of running instances.
889

890
    This checks what instances are running but unknown to the cluster.
891

892
    """
893
    bad = False
894
    for node in node_instance:
895
      for runninginstance in node_instance[node]:
896
        if runninginstance not in instancelist:
897
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
898
                          (runninginstance, node))
899
          bad = True
900
    return bad
901

    
902
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
903
    """Verify N+1 Memory Resilience.
904

905
    Check that if one single node dies we can still start all the instances it
906
    was primary for.
907

908
    """
909
    bad = False
910

    
911
    for node, nodeinfo in node_info.iteritems():
912
      # This code checks that every node which is now listed as secondary has
913
      # enough memory to host all instances it is supposed to should a single
914
      # other node in the cluster fail.
915
      # FIXME: not ready for failover to an arbitrary node
916
      # FIXME: does not support file-backed instances
917
      # WARNING: we currently take into account down instances as well as up
918
      # ones, considering that even if they're down someone might want to start
919
      # them even in the event of a node failure.
920
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
921
        needed_mem = 0
922
        for instance in instances:
923
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
924
          if bep[constants.BE_AUTO_BALANCE]:
925
            needed_mem += bep[constants.BE_MEMORY]
926
        if nodeinfo['mfree'] < needed_mem:
927
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
928
                      " failovers should node %s fail" % (node, prinode))
929
          bad = True
930
    return bad
931

    
932
  def CheckPrereq(self):
933
    """Check prerequisites.
934

935
    Transform the list of checks we're going to skip into a set and check that
936
    all its members are valid.
937

938
    """
939
    self.skip_set = frozenset(self.op.skip_checks)
940
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
941
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
942

    
943
  def BuildHooksEnv(self):
944
    """Build hooks env.
945

946
    Cluster-Verify hooks just rone in the post phase and their failure makes
947
    the output be logged in the verify output and the verification to fail.
948

949
    """
950
    all_nodes = self.cfg.GetNodeList()
951
    env = {
952
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
953
      }
954
    for node in self.cfg.GetAllNodesInfo().values():
955
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
956

    
957
    return env, [], all_nodes
958

    
959
  def Exec(self, feedback_fn):
960
    """Verify integrity of cluster, performing various test on nodes.
961

962
    """
963
    bad = False
964
    feedback_fn("* Verifying global settings")
965
    for msg in self.cfg.VerifyConfig():
966
      feedback_fn("  - ERROR: %s" % msg)
967

    
968
    vg_name = self.cfg.GetVGName()
969
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
970
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
971
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
972
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
973
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
974
                        for iname in instancelist)
975
    i_non_redundant = [] # Non redundant instances
976
    i_non_a_balanced = [] # Non auto-balanced instances
977
    n_offline = [] # List of offline nodes
978
    n_drained = [] # List of nodes being drained
979
    node_volume = {}
980
    node_instance = {}
981
    node_info = {}
982
    instance_cfg = {}
983

    
984
    # FIXME: verify OS list
985
    # do local checksums
986
    master_files = [constants.CLUSTER_CONF_FILE]
987

    
988
    file_names = ssconf.SimpleStore().GetFileList()
989
    file_names.append(constants.SSL_CERT_FILE)
990
    file_names.append(constants.RAPI_CERT_FILE)
991
    file_names.extend(master_files)
992

    
993
    local_checksums = utils.FingerprintFiles(file_names)
994

    
995
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
996
    node_verify_param = {
997
      constants.NV_FILELIST: file_names,
998
      constants.NV_NODELIST: [node.name for node in nodeinfo
999
                              if not node.offline],
1000
      constants.NV_HYPERVISOR: hypervisors,
1001
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1002
                                  node.secondary_ip) for node in nodeinfo
1003
                                 if not node.offline],
1004
      constants.NV_INSTANCELIST: hypervisors,
1005
      constants.NV_VERSION: None,
1006
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1007
      }
1008
    if vg_name is not None:
1009
      node_verify_param[constants.NV_VGLIST] = None
1010
      node_verify_param[constants.NV_LVLIST] = vg_name
1011
      node_verify_param[constants.NV_DRBDLIST] = None
1012
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1013
                                           self.cfg.GetClusterName())
1014

    
1015
    cluster = self.cfg.GetClusterInfo()
1016
    master_node = self.cfg.GetMasterNode()
1017
    all_drbd_map = self.cfg.ComputeDRBDMap()
1018

    
1019
    for node_i in nodeinfo:
1020
      node = node_i.name
1021

    
1022
      if node_i.offline:
1023
        feedback_fn("* Skipping offline node %s" % (node,))
1024
        n_offline.append(node)
1025
        continue
1026

    
1027
      if node == master_node:
1028
        ntype = "master"
1029
      elif node_i.master_candidate:
1030
        ntype = "master candidate"
1031
      elif node_i.drained:
1032
        ntype = "drained"
1033
        n_drained.append(node)
1034
      else:
1035
        ntype = "regular"
1036
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1037

    
1038
      msg = all_nvinfo[node].fail_msg
1039
      if msg:
1040
        feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1041
        bad = True
1042
        continue
1043

    
1044
      nresult = all_nvinfo[node].payload
1045
      node_drbd = {}
1046
      for minor, instance in all_drbd_map[node].items():
1047
        if instance not in instanceinfo:
1048
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1049
                      instance)
1050
          # ghost instance should not be running, but otherwise we
1051
          # don't give double warnings (both ghost instance and
1052
          # unallocated minor in use)
1053
          node_drbd[minor] = (instance, False)
1054
        else:
1055
          instance = instanceinfo[instance]
1056
          node_drbd[minor] = (instance.name, instance.admin_up)
1057
      result = self._VerifyNode(node_i, file_names, local_checksums,
1058
                                nresult, feedback_fn, master_files,
1059
                                node_drbd, vg_name)
1060
      bad = bad or result
1061

    
1062
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1063
      if vg_name is None:
1064
        node_volume[node] = {}
1065
      elif isinstance(lvdata, basestring):
1066
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1067
                    (node, utils.SafeEncode(lvdata)))
1068
        bad = True
1069
        node_volume[node] = {}
1070
      elif not isinstance(lvdata, dict):
1071
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1072
        bad = True
1073
        continue
1074
      else:
1075
        node_volume[node] = lvdata
1076

    
1077
      # node_instance
1078
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1079
      if not isinstance(idata, list):
1080
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1081
                    (node,))
1082
        bad = True
1083
        continue
1084

    
1085
      node_instance[node] = idata
1086

    
1087
      # node_info
1088
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1089
      if not isinstance(nodeinfo, dict):
1090
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1091
        bad = True
1092
        continue
1093

    
1094
      try:
1095
        node_info[node] = {
1096
          "mfree": int(nodeinfo['memory_free']),
1097
          "pinst": [],
1098
          "sinst": [],
1099
          # dictionary holding all instances this node is secondary for,
1100
          # grouped by their primary node. Each key is a cluster node, and each
1101
          # value is a list of instances which have the key as primary and the
1102
          # current node as secondary.  this is handy to calculate N+1 memory
1103
          # availability if you can only failover from a primary to its
1104
          # secondary.
1105
          "sinst-by-pnode": {},
1106
        }
1107
        # FIXME: devise a free space model for file based instances as well
1108
        if vg_name is not None:
1109
          if (constants.NV_VGLIST not in nresult or
1110
              vg_name not in nresult[constants.NV_VGLIST]):
1111
            feedback_fn("  - ERROR: node %s didn't return data for the"
1112
                        " volume group '%s' - it is either missing or broken" %
1113
                        (node, vg_name))
1114
            bad = True
1115
            continue
1116
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1117
      except (ValueError, KeyError):
1118
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1119
                    " from node %s" % (node,))
1120
        bad = True
1121
        continue
1122

    
1123
    node_vol_should = {}
1124

    
1125
    for instance in instancelist:
1126
      feedback_fn("* Verifying instance %s" % instance)
1127
      inst_config = instanceinfo[instance]
1128
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1129
                                     node_instance, feedback_fn, n_offline)
1130
      bad = bad or result
1131
      inst_nodes_offline = []
1132

    
1133
      inst_config.MapLVsByNode(node_vol_should)
1134

    
1135
      instance_cfg[instance] = inst_config
1136

    
1137
      pnode = inst_config.primary_node
1138
      if pnode in node_info:
1139
        node_info[pnode]['pinst'].append(instance)
1140
      elif pnode not in n_offline:
1141
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1142
                    " %s failed" % (instance, pnode))
1143
        bad = True
1144

    
1145
      if pnode in n_offline:
1146
        inst_nodes_offline.append(pnode)
1147

    
1148
      # If the instance is non-redundant we cannot survive losing its primary
1149
      # node, so we are not N+1 compliant. On the other hand we have no disk
1150
      # templates with more than one secondary so that situation is not well
1151
      # supported either.
1152
      # FIXME: does not support file-backed instances
1153
      if len(inst_config.secondary_nodes) == 0:
1154
        i_non_redundant.append(instance)
1155
      elif len(inst_config.secondary_nodes) > 1:
1156
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1157
                    % instance)
1158

    
1159
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1160
        i_non_a_balanced.append(instance)
1161

    
1162
      for snode in inst_config.secondary_nodes:
1163
        if snode in node_info:
1164
          node_info[snode]['sinst'].append(instance)
1165
          if pnode not in node_info[snode]['sinst-by-pnode']:
1166
            node_info[snode]['sinst-by-pnode'][pnode] = []
1167
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1168
        elif snode not in n_offline:
1169
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1170
                      " %s failed" % (instance, snode))
1171
          bad = True
1172
        if snode in n_offline:
1173
          inst_nodes_offline.append(snode)
1174

    
1175
      if inst_nodes_offline:
1176
        # warn that the instance lives on offline nodes, and set bad=True
1177
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1178
                    ", ".join(inst_nodes_offline))
1179
        bad = True
1180

    
1181
    feedback_fn("* Verifying orphan volumes")
1182
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1183
                                       feedback_fn)
1184
    bad = bad or result
1185

    
1186
    feedback_fn("* Verifying remaining instances")
1187
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1188
                                         feedback_fn)
1189
    bad = bad or result
1190

    
1191
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1192
      feedback_fn("* Verifying N+1 Memory redundancy")
1193
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1194
      bad = bad or result
1195

    
1196
    feedback_fn("* Other Notes")
1197
    if i_non_redundant:
1198
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1199
                  % len(i_non_redundant))
1200

    
1201
    if i_non_a_balanced:
1202
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1203
                  % len(i_non_a_balanced))
1204

    
1205
    if n_offline:
1206
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1207

    
1208
    if n_drained:
1209
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1210

    
1211
    return not bad
1212

    
1213
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1214
    """Analize the post-hooks' result
1215

1216
    This method analyses the hook result, handles it, and sends some
1217
    nicely-formatted feedback back to the user.
1218

1219
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1220
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1221
    @param hooks_results: the results of the multi-node hooks rpc call
1222
    @param feedback_fn: function used send feedback back to the caller
1223
    @param lu_result: previous Exec result
1224
    @return: the new Exec result, based on the previous result
1225
        and hook results
1226

1227
    """
1228
    # We only really run POST phase hooks, and are only interested in
1229
    # their results
1230
    if phase == constants.HOOKS_PHASE_POST:
1231
      # Used to change hooks' output to proper indentation
1232
      indent_re = re.compile('^', re.M)
1233
      feedback_fn("* Hooks Results")
1234
      if not hooks_results:
1235
        feedback_fn("  - ERROR: general communication failure")
1236
        lu_result = 1
1237
      else:
1238
        for node_name in hooks_results:
1239
          show_node_header = True
1240
          res = hooks_results[node_name]
1241
          msg = res.fail_msg
1242
          if msg:
1243
            if res.offline:
1244
              # no need to warn or set fail return value
1245
              continue
1246
            feedback_fn("    Communication failure in hooks execution: %s" %
1247
                        msg)
1248
            lu_result = 1
1249
            continue
1250
          for script, hkr, output in res.payload:
1251
            if hkr == constants.HKR_FAIL:
1252
              # The node header is only shown once, if there are
1253
              # failing hooks on that node
1254
              if show_node_header:
1255
                feedback_fn("  Node %s:" % node_name)
1256
                show_node_header = False
1257
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1258
              output = indent_re.sub('      ', output)
1259
              feedback_fn("%s" % output)
1260
              lu_result = 1
1261

    
1262
      return lu_result
1263

    
1264

    
1265
class LUVerifyDisks(NoHooksLU):
1266
  """Verifies the cluster disks status.
1267

1268
  """
1269
  _OP_REQP = []
1270
  REQ_BGL = False
1271

    
1272
  def ExpandNames(self):
1273
    self.needed_locks = {
1274
      locking.LEVEL_NODE: locking.ALL_SET,
1275
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1276
    }
1277
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1278

    
1279
  def CheckPrereq(self):
1280
    """Check prerequisites.
1281

1282
    This has no prerequisites.
1283

1284
    """
1285
    pass
1286

    
1287
  def Exec(self, feedback_fn):
1288
    """Verify integrity of cluster disks.
1289

1290
    @rtype: tuple of three items
1291
    @return: a tuple of (dict of node-to-node_error, list of instances
1292
        which need activate-disks, dict of instance: (node, volume) for
1293
        missing volumes
1294

1295
    """
1296
    result = res_nodes, res_instances, res_missing = {}, [], {}
1297

    
1298
    vg_name = self.cfg.GetVGName()
1299
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1300
    instances = [self.cfg.GetInstanceInfo(name)
1301
                 for name in self.cfg.GetInstanceList()]
1302

    
1303
    nv_dict = {}
1304
    for inst in instances:
1305
      inst_lvs = {}
1306
      if (not inst.admin_up or
1307
          inst.disk_template not in constants.DTS_NET_MIRROR):
1308
        continue
1309
      inst.MapLVsByNode(inst_lvs)
1310
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1311
      for node, vol_list in inst_lvs.iteritems():
1312
        for vol in vol_list:
1313
          nv_dict[(node, vol)] = inst
1314

    
1315
    if not nv_dict:
1316
      return result
1317

    
1318
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1319

    
1320
    to_act = set()
1321
    for node in nodes:
1322
      # node_volume
1323
      node_res = node_lvs[node]
1324
      if node_res.offline:
1325
        continue
1326
      msg = node_res.fail_msg
1327
      if msg:
1328
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1329
        res_nodes[node] = msg
1330
        continue
1331

    
1332
      lvs = node_res.payload
1333
      for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1334
        inst = nv_dict.pop((node, lv_name), None)
1335
        if (not lv_online and inst is not None
1336
            and inst.name not in res_instances):
1337
          res_instances.append(inst.name)
1338

    
1339
    # any leftover items in nv_dict are missing LVs, let's arrange the
1340
    # data better
1341
    for key, inst in nv_dict.iteritems():
1342
      if inst.name not in res_missing:
1343
        res_missing[inst.name] = []
1344
      res_missing[inst.name].append(key)
1345

    
1346
    return result
1347

    
1348

    
1349
class LURenameCluster(LogicalUnit):
1350
  """Rename the cluster.
1351

1352
  """
1353
  HPATH = "cluster-rename"
1354
  HTYPE = constants.HTYPE_CLUSTER
1355
  _OP_REQP = ["name"]
1356

    
1357
  def BuildHooksEnv(self):
1358
    """Build hooks env.
1359

1360
    """
1361
    env = {
1362
      "OP_TARGET": self.cfg.GetClusterName(),
1363
      "NEW_NAME": self.op.name,
1364
      }
1365
    mn = self.cfg.GetMasterNode()
1366
    return env, [mn], [mn]
1367

    
1368
  def CheckPrereq(self):
1369
    """Verify that the passed name is a valid one.
1370

1371
    """
1372
    hostname = utils.HostInfo(self.op.name)
1373

    
1374
    new_name = hostname.name
1375
    self.ip = new_ip = hostname.ip
1376
    old_name = self.cfg.GetClusterName()
1377
    old_ip = self.cfg.GetMasterIP()
1378
    if new_name == old_name and new_ip == old_ip:
1379
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1380
                                 " cluster has changed")
1381
    if new_ip != old_ip:
1382
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1383
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1384
                                   " reachable on the network. Aborting." %
1385
                                   new_ip)
1386

    
1387
    self.op.name = new_name
1388

    
1389
  def Exec(self, feedback_fn):
1390
    """Rename the cluster.
1391

1392
    """
1393
    clustername = self.op.name
1394
    ip = self.ip
1395

    
1396
    # shutdown the master IP
1397
    master = self.cfg.GetMasterNode()
1398
    result = self.rpc.call_node_stop_master(master, False)
1399
    result.Raise("Could not disable the master role")
1400

    
1401
    try:
1402
      cluster = self.cfg.GetClusterInfo()
1403
      cluster.cluster_name = clustername
1404
      cluster.master_ip = ip
1405
      self.cfg.Update(cluster)
1406

    
1407
      # update the known hosts file
1408
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1409
      node_list = self.cfg.GetNodeList()
1410
      try:
1411
        node_list.remove(master)
1412
      except ValueError:
1413
        pass
1414
      result = self.rpc.call_upload_file(node_list,
1415
                                         constants.SSH_KNOWN_HOSTS_FILE)
1416
      for to_node, to_result in result.iteritems():
1417
         msg = to_result.fail_msg
1418
         if msg:
1419
           msg = ("Copy of file %s to node %s failed: %s" %
1420
                   (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1421
           self.proc.LogWarning(msg)
1422

    
1423
    finally:
1424
      result = self.rpc.call_node_start_master(master, False)
1425
      msg = result.fail_msg
1426
      if msg:
1427
        self.LogWarning("Could not re-enable the master role on"
1428
                        " the master, please restart manually: %s", msg)
1429

    
1430

    
1431
def _RecursiveCheckIfLVMBased(disk):
1432
  """Check if the given disk or its children are lvm-based.
1433

1434
  @type disk: L{objects.Disk}
1435
  @param disk: the disk to check
1436
  @rtype: booleean
1437
  @return: boolean indicating whether a LD_LV dev_type was found or not
1438

1439
  """
1440
  if disk.children:
1441
    for chdisk in disk.children:
1442
      if _RecursiveCheckIfLVMBased(chdisk):
1443
        return True
1444
  return disk.dev_type == constants.LD_LV
1445

    
1446

    
1447
class LUSetClusterParams(LogicalUnit):
1448
  """Change the parameters of the cluster.
1449

1450
  """
1451
  HPATH = "cluster-modify"
1452
  HTYPE = constants.HTYPE_CLUSTER
1453
  _OP_REQP = []
1454
  REQ_BGL = False
1455

    
1456
  def CheckArguments(self):
1457
    """Check parameters
1458

1459
    """
1460
    if not hasattr(self.op, "candidate_pool_size"):
1461
      self.op.candidate_pool_size = None
1462
    if self.op.candidate_pool_size is not None:
1463
      try:
1464
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1465
      except (ValueError, TypeError), err:
1466
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1467
                                   str(err))
1468
      if self.op.candidate_pool_size < 1:
1469
        raise errors.OpPrereqError("At least one master candidate needed")
1470

    
1471
  def ExpandNames(self):
1472
    # FIXME: in the future maybe other cluster params won't require checking on
1473
    # all nodes to be modified.
1474
    self.needed_locks = {
1475
      locking.LEVEL_NODE: locking.ALL_SET,
1476
    }
1477
    self.share_locks[locking.LEVEL_NODE] = 1
1478

    
1479
  def BuildHooksEnv(self):
1480
    """Build hooks env.
1481

1482
    """
1483
    env = {
1484
      "OP_TARGET": self.cfg.GetClusterName(),
1485
      "NEW_VG_NAME": self.op.vg_name,
1486
      }
1487
    mn = self.cfg.GetMasterNode()
1488
    return env, [mn], [mn]
1489

    
1490
  def CheckPrereq(self):
1491
    """Check prerequisites.
1492

1493
    This checks whether the given params don't conflict and
1494
    if the given volume group is valid.
1495

1496
    """
1497
    if self.op.vg_name is not None and not self.op.vg_name:
1498
      instances = self.cfg.GetAllInstancesInfo().values()
1499
      for inst in instances:
1500
        for disk in inst.disks:
1501
          if _RecursiveCheckIfLVMBased(disk):
1502
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1503
                                       " lvm-based instances exist")
1504

    
1505
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1506

    
1507
    # if vg_name not None, checks given volume group on all nodes
1508
    if self.op.vg_name:
1509
      vglist = self.rpc.call_vg_list(node_list)
1510
      for node in node_list:
1511
        msg = vglist[node].fail_msg
1512
        if msg:
1513
          # ignoring down node
1514
          self.LogWarning("Error while gathering data on node %s"
1515
                          " (ignoring node): %s", node, msg)
1516
          continue
1517
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1518
                                              self.op.vg_name,
1519
                                              constants.MIN_VG_SIZE)
1520
        if vgstatus:
1521
          raise errors.OpPrereqError("Error on node '%s': %s" %
1522
                                     (node, vgstatus))
1523

    
1524
    self.cluster = cluster = self.cfg.GetClusterInfo()
1525
    # validate params changes
1526
    if self.op.beparams:
1527
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1528
      self.new_beparams = objects.FillDict(
1529
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1530

    
1531
    if self.op.nicparams:
1532
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1533
      self.new_nicparams = objects.FillDict(
1534
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1535
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1536

    
1537
    # hypervisor list/parameters
1538
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1539
    if self.op.hvparams:
1540
      if not isinstance(self.op.hvparams, dict):
1541
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1542
      for hv_name, hv_dict in self.op.hvparams.items():
1543
        if hv_name not in self.new_hvparams:
1544
          self.new_hvparams[hv_name] = hv_dict
1545
        else:
1546
          self.new_hvparams[hv_name].update(hv_dict)
1547

    
1548
    if self.op.enabled_hypervisors is not None:
1549
      self.hv_list = self.op.enabled_hypervisors
1550
    else:
1551
      self.hv_list = cluster.enabled_hypervisors
1552

    
1553
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1554
      # either the enabled list has changed, or the parameters have, validate
1555
      for hv_name, hv_params in self.new_hvparams.items():
1556
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1557
            (self.op.enabled_hypervisors and
1558
             hv_name in self.op.enabled_hypervisors)):
1559
          # either this is a new hypervisor, or its parameters have changed
1560
          hv_class = hypervisor.GetHypervisor(hv_name)
1561
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1562
          hv_class.CheckParameterSyntax(hv_params)
1563
          _CheckHVParams(self, node_list, hv_name, hv_params)
1564

    
1565
  def Exec(self, feedback_fn):
1566
    """Change the parameters of the cluster.
1567

1568
    """
1569
    if self.op.vg_name is not None:
1570
      new_volume = self.op.vg_name
1571
      if not new_volume:
1572
        new_volume = None
1573
      if new_volume != self.cfg.GetVGName():
1574
        self.cfg.SetVGName(new_volume)
1575
      else:
1576
        feedback_fn("Cluster LVM configuration already in desired"
1577
                    " state, not changing")
1578
    if self.op.hvparams:
1579
      self.cluster.hvparams = self.new_hvparams
1580
    if self.op.enabled_hypervisors is not None:
1581
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1582
    if self.op.beparams:
1583
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1584
    if self.op.nicparams:
1585
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1586

    
1587
    if self.op.candidate_pool_size is not None:
1588
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1589

    
1590
    self.cfg.Update(self.cluster)
1591

    
1592
    # we want to update nodes after the cluster so that if any errors
1593
    # happen, we have recorded and saved the cluster info
1594
    if self.op.candidate_pool_size is not None:
1595
      _AdjustCandidatePool(self)
1596

    
1597

    
1598
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1599
  """Distribute additional files which are part of the cluster configuration.
1600

1601
  ConfigWriter takes care of distributing the config and ssconf files, but
1602
  there are more files which should be distributed to all nodes. This function
1603
  makes sure those are copied.
1604

1605
  @param lu: calling logical unit
1606
  @param additional_nodes: list of nodes not in the config to distribute to
1607

1608
  """
1609
  # 1. Gather target nodes
1610
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1611
  dist_nodes = lu.cfg.GetNodeList()
1612
  if additional_nodes is not None:
1613
    dist_nodes.extend(additional_nodes)
1614
  if myself.name in dist_nodes:
1615
    dist_nodes.remove(myself.name)
1616
  # 2. Gather files to distribute
1617
  dist_files = set([constants.ETC_HOSTS,
1618
                    constants.SSH_KNOWN_HOSTS_FILE,
1619
                    constants.RAPI_CERT_FILE,
1620
                    constants.RAPI_USERS_FILE,
1621
                   ])
1622

    
1623
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1624
  for hv_name in enabled_hypervisors:
1625
    hv_class = hypervisor.GetHypervisor(hv_name)
1626
    dist_files.update(hv_class.GetAncillaryFiles())
1627

    
1628
  # 3. Perform the files upload
1629
  for fname in dist_files:
1630
    if os.path.exists(fname):
1631
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1632
      for to_node, to_result in result.items():
1633
         msg = to_result.fail_msg
1634
         if msg:
1635
           msg = ("Copy of file %s to node %s failed: %s" %
1636
                   (fname, to_node, msg))
1637
           lu.proc.LogWarning(msg)
1638

    
1639

    
1640
class LURedistributeConfig(NoHooksLU):
1641
  """Force the redistribution of cluster configuration.
1642

1643
  This is a very simple LU.
1644

1645
  """
1646
  _OP_REQP = []
1647
  REQ_BGL = False
1648

    
1649
  def ExpandNames(self):
1650
    self.needed_locks = {
1651
      locking.LEVEL_NODE: locking.ALL_SET,
1652
    }
1653
    self.share_locks[locking.LEVEL_NODE] = 1
1654

    
1655
  def CheckPrereq(self):
1656
    """Check prerequisites.
1657

1658
    """
1659

    
1660
  def Exec(self, feedback_fn):
1661
    """Redistribute the configuration.
1662

1663
    """
1664
    self.cfg.Update(self.cfg.GetClusterInfo())
1665
    _RedistributeAncillaryFiles(self)
1666

    
1667

    
1668
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1669
  """Sleep and poll for an instance's disk to sync.
1670

1671
  """
1672
  if not instance.disks:
1673
    return True
1674

    
1675
  if not oneshot:
1676
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1677

    
1678
  node = instance.primary_node
1679

    
1680
  for dev in instance.disks:
1681
    lu.cfg.SetDiskID(dev, node)
1682

    
1683
  retries = 0
1684
  while True:
1685
    max_time = 0
1686
    done = True
1687
    cumul_degraded = False
1688
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1689
    msg = rstats.fail_msg
1690
    if msg:
1691
      lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1692
      retries += 1
1693
      if retries >= 10:
1694
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1695
                                 " aborting." % node)
1696
      time.sleep(6)
1697
      continue
1698
    rstats = rstats.payload
1699
    retries = 0
1700
    for i, mstat in enumerate(rstats):
1701
      if mstat is None:
1702
        lu.LogWarning("Can't compute data for node %s/%s",
1703
                           node, instance.disks[i].iv_name)
1704
        continue
1705
      # we ignore the ldisk parameter
1706
      perc_done, est_time, is_degraded, _ = mstat
1707
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1708
      if perc_done is not None:
1709
        done = False
1710
        if est_time is not None:
1711
          rem_time = "%d estimated seconds remaining" % est_time
1712
          max_time = est_time
1713
        else:
1714
          rem_time = "no time estimate"
1715
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1716
                        (instance.disks[i].iv_name, perc_done, rem_time))
1717
    if done or oneshot:
1718
      break
1719

    
1720
    time.sleep(min(60, max_time))
1721

    
1722
  if done:
1723
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1724
  return not cumul_degraded
1725

    
1726

    
1727
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1728
  """Check that mirrors are not degraded.
1729

1730
  The ldisk parameter, if True, will change the test from the
1731
  is_degraded attribute (which represents overall non-ok status for
1732
  the device(s)) to the ldisk (representing the local storage status).
1733

1734
  """
1735
  lu.cfg.SetDiskID(dev, node)
1736
  if ldisk:
1737
    idx = 6
1738
  else:
1739
    idx = 5
1740

    
1741
  result = True
1742
  if on_primary or dev.AssembleOnSecondary():
1743
    rstats = lu.rpc.call_blockdev_find(node, dev)
1744
    msg = rstats.fail_msg
1745
    if msg:
1746
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1747
      result = False
1748
    elif not rstats.payload:
1749
      lu.LogWarning("Can't find disk on node %s", node)
1750
      result = False
1751
    else:
1752
      result = result and (not rstats.payload[idx])
1753
  if dev.children:
1754
    for child in dev.children:
1755
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1756

    
1757
  return result
1758

    
1759

    
1760
class LUDiagnoseOS(NoHooksLU):
1761
  """Logical unit for OS diagnose/query.
1762

1763
  """
1764
  _OP_REQP = ["output_fields", "names"]
1765
  REQ_BGL = False
1766
  _FIELDS_STATIC = utils.FieldSet()
1767
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1768

    
1769
  def ExpandNames(self):
1770
    if self.op.names:
1771
      raise errors.OpPrereqError("Selective OS query not supported")
1772

    
1773
    _CheckOutputFields(static=self._FIELDS_STATIC,
1774
                       dynamic=self._FIELDS_DYNAMIC,
1775
                       selected=self.op.output_fields)
1776

    
1777
    # Lock all nodes, in shared mode
1778
    # Temporary removal of locks, should be reverted later
1779
    # TODO: reintroduce locks when they are lighter-weight
1780
    self.needed_locks = {}
1781
    #self.share_locks[locking.LEVEL_NODE] = 1
1782
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1783

    
1784
  def CheckPrereq(self):
1785
    """Check prerequisites.
1786

1787
    """
1788

    
1789
  @staticmethod
1790
  def _DiagnoseByOS(node_list, rlist):
1791
    """Remaps a per-node return list into an a per-os per-node dictionary
1792

1793
    @param node_list: a list with the names of all nodes
1794
    @param rlist: a map with node names as keys and OS objects as values
1795

1796
    @rtype: dict
1797
    @return: a dictionary with osnames as keys and as value another map, with
1798
        nodes as keys and tuples of (path, status, diagnose) as values, eg::
1799

1800
          {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1801
                                     (/srv/..., False, "invalid api")],
1802
                           "node2": [(/srv/..., True, "")]}
1803
          }
1804

1805
    """
1806
    all_os = {}
1807
    # we build here the list of nodes that didn't fail the RPC (at RPC
1808
    # level), so that nodes with a non-responding node daemon don't
1809
    # make all OSes invalid
1810
    good_nodes = [node_name for node_name in rlist
1811
                  if not rlist[node_name].fail_msg]
1812
    for node_name, nr in rlist.items():
1813
      if nr.fail_msg or not nr.payload:
1814
        continue
1815
      for name, path, status, diagnose in nr.payload:
1816
        if name not in all_os:
1817
          # build a list of nodes for this os containing empty lists
1818
          # for each node in node_list
1819
          all_os[name] = {}
1820
          for nname in good_nodes:
1821
            all_os[name][nname] = []
1822
        all_os[name][node_name].append((path, status, diagnose))
1823
    return all_os
1824

    
1825
  def Exec(self, feedback_fn):
1826
    """Compute the list of OSes.
1827

1828
    """
1829
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1830
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1831
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1832
    output = []
1833
    for os_name, os_data in pol.items():
1834
      row = []
1835
      for field in self.op.output_fields:
1836
        if field == "name":
1837
          val = os_name
1838
        elif field == "valid":
1839
          val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1840
        elif field == "node_status":
1841
          # this is just a copy of the dict
1842
          val = {}
1843
          for node_name, nos_list in os_data.items():
1844
            val[node_name] = nos_list
1845
        else:
1846
          raise errors.ParameterError(field)
1847
        row.append(val)
1848
      output.append(row)
1849

    
1850
    return output
1851

    
1852

    
1853
class LURemoveNode(LogicalUnit):
1854
  """Logical unit for removing a node.
1855

1856
  """
1857
  HPATH = "node-remove"
1858
  HTYPE = constants.HTYPE_NODE
1859
  _OP_REQP = ["node_name"]
1860

    
1861
  def BuildHooksEnv(self):
1862
    """Build hooks env.
1863

1864
    This doesn't run on the target node in the pre phase as a failed
1865
    node would then be impossible to remove.
1866

1867
    """
1868
    env = {
1869
      "OP_TARGET": self.op.node_name,
1870
      "NODE_NAME": self.op.node_name,
1871
      }
1872
    all_nodes = self.cfg.GetNodeList()
1873
    all_nodes.remove(self.op.node_name)
1874
    return env, all_nodes, all_nodes
1875

    
1876
  def CheckPrereq(self):
1877
    """Check prerequisites.
1878

1879
    This checks:
1880
     - the node exists in the configuration
1881
     - it does not have primary or secondary instances
1882
     - it's not the master
1883

1884
    Any errors are signalled by raising errors.OpPrereqError.
1885

1886
    """
1887
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1888
    if node is None:
1889
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1890

    
1891
    instance_list = self.cfg.GetInstanceList()
1892

    
1893
    masternode = self.cfg.GetMasterNode()
1894
    if node.name == masternode:
1895
      raise errors.OpPrereqError("Node is the master node,"
1896
                                 " you need to failover first.")
1897

    
1898
    for instance_name in instance_list:
1899
      instance = self.cfg.GetInstanceInfo(instance_name)
1900
      if node.name in instance.all_nodes:
1901
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1902
                                   " please remove first." % instance_name)
1903
    self.op.node_name = node.name
1904
    self.node = node
1905

    
1906
  def Exec(self, feedback_fn):
1907
    """Removes the node from the cluster.
1908

1909
    """
1910
    node = self.node
1911
    logging.info("Stopping the node daemon and removing configs from node %s",
1912
                 node.name)
1913

    
1914
    self.context.RemoveNode(node.name)
1915

    
1916
    result = self.rpc.call_node_leave_cluster(node.name)
1917
    msg = result.fail_msg
1918
    if msg:
1919
      self.LogWarning("Errors encountered on the remote node while leaving"
1920
                      " the cluster: %s", msg)
1921

    
1922
    # Promote nodes to master candidate as needed
1923
    _AdjustCandidatePool(self)
1924

    
1925

    
1926
class LUQueryNodes(NoHooksLU):
1927
  """Logical unit for querying nodes.
1928

1929
  """
1930
  _OP_REQP = ["output_fields", "names", "use_locking"]
1931
  REQ_BGL = False
1932
  _FIELDS_DYNAMIC = utils.FieldSet(
1933
    "dtotal", "dfree",
1934
    "mtotal", "mnode", "mfree",
1935
    "bootid",
1936
    "ctotal", "cnodes", "csockets",
1937
    )
1938

    
1939
  _FIELDS_STATIC = utils.FieldSet(
1940
    "name", "pinst_cnt", "sinst_cnt",
1941
    "pinst_list", "sinst_list",
1942
    "pip", "sip", "tags",
1943
    "serial_no",
1944
    "master_candidate",
1945
    "master",
1946
    "offline",
1947
    "drained",
1948
    )
1949

    
1950
  def ExpandNames(self):
1951
    _CheckOutputFields(static=self._FIELDS_STATIC,
1952
                       dynamic=self._FIELDS_DYNAMIC,
1953
                       selected=self.op.output_fields)
1954

    
1955
    self.needed_locks = {}
1956
    self.share_locks[locking.LEVEL_NODE] = 1
1957

    
1958
    if self.op.names:
1959
      self.wanted = _GetWantedNodes(self, self.op.names)
1960
    else:
1961
      self.wanted = locking.ALL_SET
1962

    
1963
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1964
    self.do_locking = self.do_node_query and self.op.use_locking
1965
    if self.do_locking:
1966
      # if we don't request only static fields, we need to lock the nodes
1967
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1968

    
1969

    
1970
  def CheckPrereq(self):
1971
    """Check prerequisites.
1972

1973
    """
1974
    # The validation of the node list is done in the _GetWantedNodes,
1975
    # if non empty, and if empty, there's no validation to do
1976
    pass
1977

    
1978
  def Exec(self, feedback_fn):
1979
    """Computes the list of nodes and their attributes.
1980

1981
    """
1982
    all_info = self.cfg.GetAllNodesInfo()
1983
    if self.do_locking:
1984
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1985
    elif self.wanted != locking.ALL_SET:
1986
      nodenames = self.wanted
1987
      missing = set(nodenames).difference(all_info.keys())
1988
      if missing:
1989
        raise errors.OpExecError(
1990
          "Some nodes were removed before retrieving their data: %s" % missing)
1991
    else:
1992
      nodenames = all_info.keys()
1993

    
1994
    nodenames = utils.NiceSort(nodenames)
1995
    nodelist = [all_info[name] for name in nodenames]
1996

    
1997
    # begin data gathering
1998

    
1999
    if self.do_node_query:
2000
      live_data = {}
2001
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2002
                                          self.cfg.GetHypervisorType())
2003
      for name in nodenames:
2004
        nodeinfo = node_data[name]
2005
        if not nodeinfo.fail_msg and nodeinfo.payload:
2006
          nodeinfo = nodeinfo.payload
2007
          fn = utils.TryConvert
2008
          live_data[name] = {
2009
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2010
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2011
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
2012
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2013
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
2014
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2015
            "bootid": nodeinfo.get('bootid', None),
2016
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2017
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2018
            }
2019
        else:
2020
          live_data[name] = {}
2021
    else:
2022
      live_data = dict.fromkeys(nodenames, {})
2023

    
2024
    node_to_primary = dict([(name, set()) for name in nodenames])
2025
    node_to_secondary = dict([(name, set()) for name in nodenames])
2026

    
2027
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2028
                             "sinst_cnt", "sinst_list"))
2029
    if inst_fields & frozenset(self.op.output_fields):
2030
      instancelist = self.cfg.GetInstanceList()
2031

    
2032
      for instance_name in instancelist:
2033
        inst = self.cfg.GetInstanceInfo(instance_name)
2034
        if inst.primary_node in node_to_primary:
2035
          node_to_primary[inst.primary_node].add(inst.name)
2036
        for secnode in inst.secondary_nodes:
2037
          if secnode in node_to_secondary:
2038
            node_to_secondary[secnode].add(inst.name)
2039

    
2040
    master_node = self.cfg.GetMasterNode()
2041

    
2042
    # end data gathering
2043

    
2044
    output = []
2045
    for node in nodelist:
2046
      node_output = []
2047
      for field in self.op.output_fields:
2048
        if field == "name":
2049
          val = node.name
2050
        elif field == "pinst_list":
2051
          val = list(node_to_primary[node.name])
2052
        elif field == "sinst_list":
2053
          val = list(node_to_secondary[node.name])
2054
        elif field == "pinst_cnt":
2055
          val = len(node_to_primary[node.name])
2056
        elif field == "sinst_cnt":
2057
          val = len(node_to_secondary[node.name])
2058
        elif field == "pip":
2059
          val = node.primary_ip
2060
        elif field == "sip":
2061
          val = node.secondary_ip
2062
        elif field == "tags":
2063
          val = list(node.GetTags())
2064
        elif field == "serial_no":
2065
          val = node.serial_no
2066
        elif field == "master_candidate":
2067
          val = node.master_candidate
2068
        elif field == "master":
2069
          val = node.name == master_node
2070
        elif field == "offline":
2071
          val = node.offline
2072
        elif field == "drained":
2073
          val = node.drained
2074
        elif self._FIELDS_DYNAMIC.Matches(field):
2075
          val = live_data[node.name].get(field, None)
2076
        else:
2077
          raise errors.ParameterError(field)
2078
        node_output.append(val)
2079
      output.append(node_output)
2080

    
2081
    return output
2082

    
2083

    
2084
class LUQueryNodeVolumes(NoHooksLU):
2085
  """Logical unit for getting volumes on node(s).
2086

2087
  """
2088
  _OP_REQP = ["nodes", "output_fields"]
2089
  REQ_BGL = False
2090
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2091
  _FIELDS_STATIC = utils.FieldSet("node")
2092

    
2093
  def ExpandNames(self):
2094
    _CheckOutputFields(static=self._FIELDS_STATIC,
2095
                       dynamic=self._FIELDS_DYNAMIC,
2096
                       selected=self.op.output_fields)
2097

    
2098
    self.needed_locks = {}
2099
    self.share_locks[locking.LEVEL_NODE] = 1
2100
    if not self.op.nodes:
2101
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2102
    else:
2103
      self.needed_locks[locking.LEVEL_NODE] = \
2104
        _GetWantedNodes(self, self.op.nodes)
2105

    
2106
  def CheckPrereq(self):
2107
    """Check prerequisites.
2108

2109
    This checks that the fields required are valid output fields.
2110

2111
    """
2112
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2113

    
2114
  def Exec(self, feedback_fn):
2115
    """Computes the list of nodes and their attributes.
2116

2117
    """
2118
    nodenames = self.nodes
2119
    volumes = self.rpc.call_node_volumes(nodenames)
2120

    
2121
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2122
             in self.cfg.GetInstanceList()]
2123

    
2124
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2125

    
2126
    output = []
2127
    for node in nodenames:
2128
      nresult = volumes[node]
2129
      if nresult.offline:
2130
        continue
2131
      msg = nresult.fail_msg
2132
      if msg:
2133
        self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2134
        continue
2135

    
2136
      node_vols = nresult.payload[:]
2137
      node_vols.sort(key=lambda vol: vol['dev'])
2138

    
2139
      for vol in node_vols:
2140
        node_output = []
2141
        for field in self.op.output_fields:
2142
          if field == "node":
2143
            val = node
2144
          elif field == "phys":
2145
            val = vol['dev']
2146
          elif field == "vg":
2147
            val = vol['vg']
2148
          elif field == "name":
2149
            val = vol['name']
2150
          elif field == "size":
2151
            val = int(float(vol['size']))
2152
          elif field == "instance":
2153
            for inst in ilist:
2154
              if node not in lv_by_node[inst]:
2155
                continue
2156
              if vol['name'] in lv_by_node[inst][node]:
2157
                val = inst.name
2158
                break
2159
            else:
2160
              val = '-'
2161
          else:
2162
            raise errors.ParameterError(field)
2163
          node_output.append(str(val))
2164

    
2165
        output.append(node_output)
2166

    
2167
    return output
2168

    
2169

    
2170
class LUAddNode(LogicalUnit):
2171
  """Logical unit for adding node to the cluster.
2172

2173
  """
2174
  HPATH = "node-add"
2175
  HTYPE = constants.HTYPE_NODE
2176
  _OP_REQP = ["node_name"]
2177

    
2178
  def BuildHooksEnv(self):
2179
    """Build hooks env.
2180

2181
    This will run on all nodes before, and on all nodes + the new node after.
2182

2183
    """
2184
    env = {
2185
      "OP_TARGET": self.op.node_name,
2186
      "NODE_NAME": self.op.node_name,
2187
      "NODE_PIP": self.op.primary_ip,
2188
      "NODE_SIP": self.op.secondary_ip,
2189
      }
2190
    nodes_0 = self.cfg.GetNodeList()
2191
    nodes_1 = nodes_0 + [self.op.node_name, ]
2192
    return env, nodes_0, nodes_1
2193

    
2194
  def CheckPrereq(self):
2195
    """Check prerequisites.
2196

2197
    This checks:
2198
     - the new node is not already in the config
2199
     - it is resolvable
2200
     - its parameters (single/dual homed) matches the cluster
2201

2202
    Any errors are signalled by raising errors.OpPrereqError.
2203

2204
    """
2205
    node_name = self.op.node_name
2206
    cfg = self.cfg
2207

    
2208
    dns_data = utils.HostInfo(node_name)
2209

    
2210
    node = dns_data.name
2211
    primary_ip = self.op.primary_ip = dns_data.ip
2212
    secondary_ip = getattr(self.op, "secondary_ip", None)
2213
    if secondary_ip is None:
2214
      secondary_ip = primary_ip
2215
    if not utils.IsValidIP(secondary_ip):
2216
      raise errors.OpPrereqError("Invalid secondary IP given")
2217
    self.op.secondary_ip = secondary_ip
2218

    
2219
    node_list = cfg.GetNodeList()
2220
    if not self.op.readd and node in node_list:
2221
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2222
                                 node)
2223
    elif self.op.readd and node not in node_list:
2224
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2225

    
2226
    for existing_node_name in node_list:
2227
      existing_node = cfg.GetNodeInfo(existing_node_name)
2228

    
2229
      if self.op.readd and node == existing_node_name:
2230
        if (existing_node.primary_ip != primary_ip or
2231
            existing_node.secondary_ip != secondary_ip):
2232
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2233
                                     " address configuration as before")
2234
        continue
2235

    
2236
      if (existing_node.primary_ip == primary_ip or
2237
          existing_node.secondary_ip == primary_ip or
2238
          existing_node.primary_ip == secondary_ip or
2239
          existing_node.secondary_ip == secondary_ip):
2240
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2241
                                   " existing node %s" % existing_node.name)
2242

    
2243
    # check that the type of the node (single versus dual homed) is the
2244
    # same as for the master
2245
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2246
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2247
    newbie_singlehomed = secondary_ip == primary_ip
2248
    if master_singlehomed != newbie_singlehomed:
2249
      if master_singlehomed:
2250
        raise errors.OpPrereqError("The master has no private ip but the"
2251
                                   " new node has one")
2252
      else:
2253
        raise errors.OpPrereqError("The master has a private ip but the"
2254
                                   " new node doesn't have one")
2255

    
2256
    # checks reachablity
2257
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2258
      raise errors.OpPrereqError("Node not reachable by ping")
2259

    
2260
    if not newbie_singlehomed:
2261
      # check reachability from my secondary ip to newbie's secondary ip
2262
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2263
                           source=myself.secondary_ip):
2264
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2265
                                   " based ping to noded port")
2266

    
2267
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2268
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2269
    master_candidate = mc_now < cp_size
2270

    
2271
    self.new_node = objects.Node(name=node,
2272
                                 primary_ip=primary_ip,
2273
                                 secondary_ip=secondary_ip,
2274
                                 master_candidate=master_candidate,
2275
                                 offline=False, drained=False)
2276

    
2277
  def Exec(self, feedback_fn):
2278
    """Adds the new node to the cluster.
2279

2280
    """
2281
    new_node = self.new_node
2282
    node = new_node.name
2283

    
2284
    # check connectivity
2285
    result = self.rpc.call_version([node])[node]
2286
    result.Raise("Can't get version information from node %s" % node)
2287
    if constants.PROTOCOL_VERSION == result.payload:
2288
      logging.info("Communication to node %s fine, sw version %s match",
2289
                   node, result.payload)
2290
    else:
2291
      raise errors.OpExecError("Version mismatch master version %s,"
2292
                               " node version %s" %
2293
                               (constants.PROTOCOL_VERSION, result.payload))
2294

    
2295
    # setup ssh on node
2296
    logging.info("Copy ssh key to node %s", node)
2297
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2298
    keyarray = []
2299
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2300
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2301
                priv_key, pub_key]
2302

    
2303
    for i in keyfiles:
2304
      f = open(i, 'r')
2305
      try:
2306
        keyarray.append(f.read())
2307
      finally:
2308
        f.close()
2309

    
2310
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2311
                                    keyarray[2],
2312
                                    keyarray[3], keyarray[4], keyarray[5])
2313
    result.Raise("Cannot transfer ssh keys to the new node")
2314

    
2315
    # Add node to our /etc/hosts, and add key to known_hosts
2316
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2317
      utils.AddHostToEtcHosts(new_node.name)
2318

    
2319
    if new_node.secondary_ip != new_node.primary_ip:
2320
      result = self.rpc.call_node_has_ip_address(new_node.name,
2321
                                                 new_node.secondary_ip)
2322
      result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2323
                   prereq=True)
2324
      if not result.payload:
2325
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2326
                                 " you gave (%s). Please fix and re-run this"
2327
                                 " command." % new_node.secondary_ip)
2328

    
2329
    node_verify_list = [self.cfg.GetMasterNode()]
2330
    node_verify_param = {
2331
      'nodelist': [node],
2332
      # TODO: do a node-net-test as well?
2333
    }
2334

    
2335
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2336
                                       self.cfg.GetClusterName())
2337
    for verifier in node_verify_list:
2338
      result[verifier].Raise("Cannot communicate with node %s" % verifier)
2339
      nl_payload = result[verifier].payload['nodelist']
2340
      if nl_payload:
2341
        for failed in nl_payload:
2342
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2343
                      (verifier, nl_payload[failed]))
2344
        raise errors.OpExecError("ssh/hostname verification failed.")
2345

    
2346
    if self.op.readd:
2347
      _RedistributeAncillaryFiles(self)
2348
      self.context.ReaddNode(new_node)
2349
    else:
2350
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2351
      self.context.AddNode(new_node)
2352

    
2353

    
2354
class LUSetNodeParams(LogicalUnit):
2355
  """Modifies the parameters of a node.
2356

2357
  """
2358
  HPATH = "node-modify"
2359
  HTYPE = constants.HTYPE_NODE
2360
  _OP_REQP = ["node_name"]
2361
  REQ_BGL = False
2362

    
2363
  def CheckArguments(self):
2364
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2365
    if node_name is None:
2366
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2367
    self.op.node_name = node_name
2368
    _CheckBooleanOpField(self.op, 'master_candidate')
2369
    _CheckBooleanOpField(self.op, 'offline')
2370
    _CheckBooleanOpField(self.op, 'drained')
2371
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2372
    if all_mods.count(None) == 3:
2373
      raise errors.OpPrereqError("Please pass at least one modification")
2374
    if all_mods.count(True) > 1:
2375
      raise errors.OpPrereqError("Can't set the node into more than one"
2376
                                 " state at the same time")
2377

    
2378
  def ExpandNames(self):
2379
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2380

    
2381
  def BuildHooksEnv(self):
2382
    """Build hooks env.
2383

2384
    This runs on the master node.
2385

2386
    """
2387
    env = {
2388
      "OP_TARGET": self.op.node_name,
2389
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2390
      "OFFLINE": str(self.op.offline),
2391
      "DRAINED": str(self.op.drained),
2392
      }
2393
    nl = [self.cfg.GetMasterNode(),
2394
          self.op.node_name]
2395
    return env, nl, nl
2396

    
2397
  def CheckPrereq(self):
2398
    """Check prerequisites.
2399

2400
    This only checks the instance list against the existing names.
2401

2402
    """
2403
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2404

    
2405
    if ((self.op.master_candidate == False or self.op.offline == True or
2406
         self.op.drained == True) and node.master_candidate):
2407
      # we will demote the node from master_candidate
2408
      if self.op.node_name == self.cfg.GetMasterNode():
2409
        raise errors.OpPrereqError("The master node has to be a"
2410
                                   " master candidate, online and not drained")
2411
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2412
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2413
      if num_candidates <= cp_size:
2414
        msg = ("Not enough master candidates (desired"
2415
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2416
        if self.op.force:
2417
          self.LogWarning(msg)
2418
        else:
2419
          raise errors.OpPrereqError(msg)
2420

    
2421
    if (self.op.master_candidate == True and
2422
        ((node.offline and not self.op.offline == False) or
2423
         (node.drained and not self.op.drained == False))):
2424
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2425
                                 " to master_candidate" % node.name)
2426

    
2427
    return
2428

    
2429
  def Exec(self, feedback_fn):
2430
    """Modifies a node.
2431

2432
    """
2433
    node = self.node
2434

    
2435
    result = []
2436
    changed_mc = False
2437

    
2438
    if self.op.offline is not None:
2439
      node.offline = self.op.offline
2440
      result.append(("offline", str(self.op.offline)))
2441
      if self.op.offline == True:
2442
        if node.master_candidate:
2443
          node.master_candidate = False
2444
          changed_mc = True
2445
          result.append(("master_candidate", "auto-demotion due to offline"))
2446
        if node.drained:
2447
          node.drained = False
2448
          result.append(("drained", "clear drained status due to offline"))
2449

    
2450
    if self.op.master_candidate is not None:
2451
      node.master_candidate = self.op.master_candidate
2452
      changed_mc = True
2453
      result.append(("master_candidate", str(self.op.master_candidate)))
2454
      if self.op.master_candidate == False:
2455
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2456
        msg = rrc.fail_msg
2457
        if msg:
2458
          self.LogWarning("Node failed to demote itself: %s" % msg)
2459

    
2460
    if self.op.drained is not None:
2461
      node.drained = self.op.drained
2462
      result.append(("drained", str(self.op.drained)))
2463
      if self.op.drained == True:
2464
        if node.master_candidate:
2465
          node.master_candidate = False
2466
          changed_mc = True
2467
          result.append(("master_candidate", "auto-demotion due to drain"))
2468
        if node.offline:
2469
          node.offline = False
2470
          result.append(("offline", "clear offline status due to drain"))
2471

    
2472
    # this will trigger configuration file update, if needed
2473
    self.cfg.Update(node)
2474
    # this will trigger job queue propagation or cleanup
2475
    if changed_mc:
2476
      self.context.ReaddNode(node)
2477

    
2478
    return result
2479

    
2480

    
2481
class LUPowercycleNode(NoHooksLU):
2482
  """Powercycles a node.
2483

2484
  """
2485
  _OP_REQP = ["node_name", "force"]
2486
  REQ_BGL = False
2487

    
2488
  def CheckArguments(self):
2489
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2490
    if node_name is None:
2491
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2492
    self.op.node_name = node_name
2493
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2494
      raise errors.OpPrereqError("The node is the master and the force"
2495
                                 " parameter was not set")
2496

    
2497
  def ExpandNames(self):
2498
    """Locking for PowercycleNode.
2499

2500
    This is a last-resource option and shouldn't block on other
2501
    jobs. Therefore, we grab no locks.
2502

2503
    """
2504
    self.needed_locks = {}
2505

    
2506
  def CheckPrereq(self):
2507
    """Check prerequisites.
2508

2509
    This LU has no prereqs.
2510

2511
    """
2512
    pass
2513

    
2514
  def Exec(self, feedback_fn):
2515
    """Reboots a node.
2516

2517
    """
2518
    result = self.rpc.call_node_powercycle(self.op.node_name,
2519
                                           self.cfg.GetHypervisorType())
2520
    result.Raise("Failed to schedule the reboot")
2521
    return result.payload
2522

    
2523

    
2524
class LUQueryClusterInfo(NoHooksLU):
2525
  """Query cluster configuration.
2526

2527
  """
2528
  _OP_REQP = []
2529
  REQ_BGL = False
2530

    
2531
  def ExpandNames(self):
2532
    self.needed_locks = {}
2533

    
2534
  def CheckPrereq(self):
2535
    """No prerequsites needed for this LU.
2536

2537
    """
2538
    pass
2539

    
2540
  def Exec(self, feedback_fn):
2541
    """Return cluster config.
2542

2543
    """
2544
    cluster = self.cfg.GetClusterInfo()
2545
    result = {
2546
      "software_version": constants.RELEASE_VERSION,
2547
      "protocol_version": constants.PROTOCOL_VERSION,
2548
      "config_version": constants.CONFIG_VERSION,
2549
      "os_api_version": constants.OS_API_VERSION,
2550
      "export_version": constants.EXPORT_VERSION,
2551
      "architecture": (platform.architecture()[0], platform.machine()),
2552
      "name": cluster.cluster_name,
2553
      "master": cluster.master_node,
2554
      "default_hypervisor": cluster.default_hypervisor,
2555
      "enabled_hypervisors": cluster.enabled_hypervisors,
2556
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2557
                        for hypervisor in cluster.enabled_hypervisors]),
2558
      "beparams": cluster.beparams,
2559
      "nicparams": cluster.nicparams,
2560
      "candidate_pool_size": cluster.candidate_pool_size,
2561
      "master_netdev": cluster.master_netdev,
2562
      "volume_group_name": cluster.volume_group_name,
2563
      "file_storage_dir": cluster.file_storage_dir,
2564
      }
2565

    
2566
    return result
2567

    
2568

    
2569
class LUQueryConfigValues(NoHooksLU):
2570
  """Return configuration values.
2571

2572
  """
2573
  _OP_REQP = []
2574
  REQ_BGL = False
2575
  _FIELDS_DYNAMIC = utils.FieldSet()
2576
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2577

    
2578
  def ExpandNames(self):
2579
    self.needed_locks = {}
2580

    
2581
    _CheckOutputFields(static=self._FIELDS_STATIC,
2582
                       dynamic=self._FIELDS_DYNAMIC,
2583
                       selected=self.op.output_fields)
2584

    
2585
  def CheckPrereq(self):
2586
    """No prerequisites.
2587

2588
    """
2589
    pass
2590

    
2591
  def Exec(self, feedback_fn):
2592
    """Dump a representation of the cluster config to the standard output.
2593

2594
    """
2595
    values = []
2596
    for field in self.op.output_fields:
2597
      if field == "cluster_name":
2598
        entry = self.cfg.GetClusterName()
2599
      elif field == "master_node":
2600
        entry = self.cfg.GetMasterNode()
2601
      elif field == "drain_flag":
2602
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2603
      else:
2604
        raise errors.ParameterError(field)
2605
      values.append(entry)
2606
    return values
2607

    
2608

    
2609
class LUActivateInstanceDisks(NoHooksLU):
2610
  """Bring up an instance's disks.
2611

2612
  """
2613
  _OP_REQP = ["instance_name"]
2614
  REQ_BGL = False
2615

    
2616
  def ExpandNames(self):
2617
    self._ExpandAndLockInstance()
2618
    self.needed_locks[locking.LEVEL_NODE] = []
2619
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2620

    
2621
  def DeclareLocks(self, level):
2622
    if level == locking.LEVEL_NODE:
2623
      self._LockInstancesNodes()
2624

    
2625
  def CheckPrereq(self):
2626
    """Check prerequisites.
2627

2628
    This checks that the instance is in the cluster.
2629

2630
    """
2631
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2632
    assert self.instance is not None, \
2633
      "Cannot retrieve locked instance %s" % self.op.instance_name
2634
    _CheckNodeOnline(self, self.instance.primary_node)
2635

    
2636
  def Exec(self, feedback_fn):
2637
    """Activate the disks.
2638

2639
    """
2640
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2641
    if not disks_ok:
2642
      raise errors.OpExecError("Cannot activate block devices")
2643

    
2644
    return disks_info
2645

    
2646

    
2647
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2648
  """Prepare the block devices for an instance.
2649

2650
  This sets up the block devices on all nodes.
2651

2652
  @type lu: L{LogicalUnit}
2653
  @param lu: the logical unit on whose behalf we execute
2654
  @type instance: L{objects.Instance}
2655
  @param instance: the instance for whose disks we assemble
2656
  @type ignore_secondaries: boolean
2657
  @param ignore_secondaries: if true, errors on secondary nodes
2658
      won't result in an error return from the function
2659
  @return: False if the operation failed, otherwise a list of
2660
      (host, instance_visible_name, node_visible_name)
2661
      with the mapping from node devices to instance devices
2662

2663
  """
2664
  device_info = []
2665
  disks_ok = True
2666
  iname = instance.name
2667
  # With the two passes mechanism we try to reduce the window of
2668
  # opportunity for the race condition of switching DRBD to primary
2669
  # before handshaking occured, but we do not eliminate it
2670

    
2671
  # The proper fix would be to wait (with some limits) until the
2672
  # connection has been made and drbd transitions from WFConnection
2673
  # into any other network-connected state (Connected, SyncTarget,
2674
  # SyncSource, etc.)
2675

    
2676
  # 1st pass, assemble on all nodes in secondary mode
2677
  for inst_disk in instance.disks:
2678
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2679
      lu.cfg.SetDiskID(node_disk, node)
2680
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2681
      msg = result.fail_msg
2682
      if msg:
2683
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2684
                           " (is_primary=False, pass=1): %s",
2685
                           inst_disk.iv_name, node, msg)
2686
        if not ignore_secondaries:
2687
          disks_ok = False
2688

    
2689
  # FIXME: race condition on drbd migration to primary
2690

    
2691
  # 2nd pass, do only the primary node
2692
  for inst_disk in instance.disks:
2693
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2694
      if node != instance.primary_node:
2695
        continue
2696
      lu.cfg.SetDiskID(node_disk, node)
2697
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2698
      msg = result.fail_msg
2699
      if msg:
2700
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2701
                           " (is_primary=True, pass=2): %s",
2702
                           inst_disk.iv_name, node, msg)
2703
        disks_ok = False
2704
    device_info.append((instance.primary_node, inst_disk.iv_name,
2705
                        result.payload))
2706

    
2707
  # leave the disks configured for the primary node
2708
  # this is a workaround that would be fixed better by
2709
  # improving the logical/physical id handling
2710
  for disk in instance.disks:
2711
    lu.cfg.SetDiskID(disk, instance.primary_node)
2712

    
2713
  return disks_ok, device_info
2714

    
2715

    
2716
def _StartInstanceDisks(lu, instance, force):
2717
  """Start the disks of an instance.
2718

2719
  """
2720
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2721
                                           ignore_secondaries=force)
2722
  if not disks_ok:
2723
    _ShutdownInstanceDisks(lu, instance)
2724
    if force is not None and not force:
2725
      lu.proc.LogWarning("", hint="If the message above refers to a"
2726
                         " secondary node,"
2727
                         " you can retry the operation using '--force'.")
2728
    raise errors.OpExecError("Disk consistency error")
2729

    
2730

    
2731
class LUDeactivateInstanceDisks(NoHooksLU):
2732
  """Shutdown an instance's disks.
2733

2734
  """
2735
  _OP_REQP = ["instance_name"]
2736
  REQ_BGL = False
2737

    
2738
  def ExpandNames(self):
2739
    self._ExpandAndLockInstance()
2740
    self.needed_locks[locking.LEVEL_NODE] = []
2741
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2742

    
2743
  def DeclareLocks(self, level):
2744
    if level == locking.LEVEL_NODE:
2745
      self._LockInstancesNodes()
2746

    
2747
  def CheckPrereq(self):
2748
    """Check prerequisites.
2749

2750
    This checks that the instance is in the cluster.
2751

2752
    """
2753
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2754
    assert self.instance is not None, \
2755
      "Cannot retrieve locked instance %s" % self.op.instance_name
2756

    
2757
  def Exec(self, feedback_fn):
2758
    """Deactivate the disks
2759

2760
    """
2761
    instance = self.instance
2762
    _SafeShutdownInstanceDisks(self, instance)
2763

    
2764

    
2765
def _SafeShutdownInstanceDisks(lu, instance):
2766
  """Shutdown block devices of an instance.
2767

2768
  This function checks if an instance is running, before calling
2769
  _ShutdownInstanceDisks.
2770

2771
  """
2772
  pnode = instance.primary_node
2773
  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2774
  ins_l.Raise("Can't contact node %s" % pnode)
2775

    
2776
  if instance.name in ins_l.payload:
2777
    raise errors.OpExecError("Instance is running, can't shutdown"
2778
                             " block devices.")
2779

    
2780
  _ShutdownInstanceDisks(lu, instance)
2781

    
2782

    
2783
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2784
  """Shutdown block devices of an instance.
2785

2786
  This does the shutdown on all nodes of the instance.
2787

2788
  If the ignore_primary is false, errors on the primary node are
2789
  ignored.
2790

2791
  """
2792
  all_result = True
2793
  for disk in instance.disks:
2794
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2795
      lu.cfg.SetDiskID(top_disk, node)
2796
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2797
      msg = result.fail_msg
2798
      if msg:
2799
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2800
                      disk.iv_name, node, msg)
2801
        if not ignore_primary or node != instance.primary_node:
2802
          all_result = False
2803
  return all_result
2804

    
2805

    
2806
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2807
  """Checks if a node has enough free memory.
2808

2809
  This function check if a given node has the needed amount of free
2810
  memory. In case the node has less memory or we cannot get the
2811
  information from the node, this function raise an OpPrereqError
2812
  exception.
2813

2814
  @type lu: C{LogicalUnit}
2815
  @param lu: a logical unit from which we get configuration data
2816
  @type node: C{str}
2817
  @param node: the node to check
2818
  @type reason: C{str}
2819
  @param reason: string to use in the error message
2820
  @type requested: C{int}
2821
  @param requested: the amount of memory in MiB to check for
2822
  @type hypervisor_name: C{str}
2823
  @param hypervisor_name: the hypervisor to ask for memory stats
2824
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2825
      we cannot check the node
2826

2827
  """
2828
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2829
  nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2830
  free_mem = nodeinfo[node].payload.get('memory_free', None)
2831
  if not isinstance(free_mem, int):
2832
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2833
                               " was '%s'" % (node, free_mem))
2834
  if requested > free_mem:
2835
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2836
                               " needed %s MiB, available %s MiB" %
2837
                               (node, reason, requested, free_mem))
2838

    
2839

    
2840
class LUStartupInstance(LogicalUnit):
2841
  """Starts an instance.
2842

2843
  """
2844
  HPATH = "instance-start"
2845
  HTYPE = constants.HTYPE_INSTANCE
2846
  _OP_REQP = ["instance_name", "force"]
2847
  REQ_BGL = False
2848

    
2849
  def ExpandNames(self):
2850
    self._ExpandAndLockInstance()
2851

    
2852
  def BuildHooksEnv(self):
2853
    """Build hooks env.
2854

2855
    This runs on master, primary and secondary nodes of the instance.
2856

2857
    """
2858
    env = {
2859
      "FORCE": self.op.force,
2860
      }
2861
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2862
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2863
    return env, nl, nl
2864

    
2865
  def CheckPrereq(self):
2866
    """Check prerequisites.
2867

2868
    This checks that the instance is in the cluster.
2869

2870
    """
2871
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2872
    assert self.instance is not None, \
2873
      "Cannot retrieve locked instance %s" % self.op.instance_name
2874

    
2875
    # extra beparams
2876
    self.beparams = getattr(self.op, "beparams", {})
2877
    if self.beparams:
2878
      if not isinstance(self.beparams, dict):
2879
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2880
                                   " dict" % (type(self.beparams), ))
2881
      # fill the beparams dict
2882
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2883
      self.op.beparams = self.beparams
2884

    
2885
    # extra hvparams
2886
    self.hvparams = getattr(self.op, "hvparams", {})
2887
    if self.hvparams:
2888
      if not isinstance(self.hvparams, dict):
2889
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2890
                                   " dict" % (type(self.hvparams), ))
2891

    
2892
      # check hypervisor parameter syntax (locally)
2893
      cluster = self.cfg.GetClusterInfo()
2894
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2895
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2896
                                    instance.hvparams)
2897
      filled_hvp.update(self.hvparams)
2898
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2899
      hv_type.CheckParameterSyntax(filled_hvp)
2900
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2901
      self.op.hvparams = self.hvparams
2902

    
2903
    _CheckNodeOnline(self, instance.primary_node)
2904

    
2905
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2906
    # check bridges existance
2907
    _CheckInstanceBridgesExist(self, instance)
2908

    
2909
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2910
                                              instance.name,
2911
                                              instance.hypervisor)
2912
    remote_info.Raise("Error checking node %s" % instance.primary_node,
2913
                      prereq=True)
2914
    if not remote_info.payload: # not running already
2915
      _CheckNodeFreeMemory(self, instance.primary_node,
2916
                           "starting instance %s" % instance.name,
2917
                           bep[constants.BE_MEMORY], instance.hypervisor)
2918

    
2919
  def Exec(self, feedback_fn):
2920
    """Start the instance.
2921

2922
    """
2923
    instance = self.instance
2924
    force = self.op.force
2925

    
2926
    self.cfg.MarkInstanceUp(instance.name)
2927

    
2928
    node_current = instance.primary_node
2929

    
2930
    _StartInstanceDisks(self, instance, force)
2931

    
2932
    result = self.rpc.call_instance_start(node_current, instance,
2933
                                          self.hvparams, self.beparams)
2934
    msg = result.fail_msg
2935
    if msg:
2936
      _ShutdownInstanceDisks(self, instance)
2937
      raise errors.OpExecError("Could not start instance: %s" % msg)
2938

    
2939

    
2940
class LURebootInstance(LogicalUnit):
2941
  """Reboot an instance.
2942

2943
  """
2944
  HPATH = "instance-reboot"
2945
  HTYPE = constants.HTYPE_INSTANCE
2946
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2947
  REQ_BGL = False
2948

    
2949
  def ExpandNames(self):
2950
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2951
                                   constants.INSTANCE_REBOOT_HARD,
2952
                                   constants.INSTANCE_REBOOT_FULL]:
2953
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2954
                                  (constants.INSTANCE_REBOOT_SOFT,
2955
                                   constants.INSTANCE_REBOOT_HARD,
2956
                                   constants.INSTANCE_REBOOT_FULL))
2957
    self._ExpandAndLockInstance()
2958

    
2959
  def BuildHooksEnv(self):
2960
    """Build hooks env.
2961

2962
    This runs on master, primary and secondary nodes of the instance.
2963

2964
    """
2965
    env = {
2966
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2967
      "REBOOT_TYPE": self.op.reboot_type,
2968
      }
2969
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2970
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2971
    return env, nl, nl
2972

    
2973
  def CheckPrereq(self):
2974
    """Check prerequisites.
2975

2976
    This checks that the instance is in the cluster.
2977

2978
    """
2979
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2980
    assert self.instance is not None, \
2981
      "Cannot retrieve locked instance %s" % self.op.instance_name
2982

    
2983
    _CheckNodeOnline(self, instance.primary_node)
2984

    
2985
    # check bridges existance
2986
    _CheckInstanceBridgesExist(self, instance)
2987

    
2988
  def Exec(self, feedback_fn):
2989
    """Reboot the instance.
2990

2991
    """
2992
    instance = self.instance
2993
    ignore_secondaries = self.op.ignore_secondaries
2994
    reboot_type = self.op.reboot_type
2995

    
2996
    node_current = instance.primary_node
2997

    
2998
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2999
                       constants.INSTANCE_REBOOT_HARD]:
3000
      for disk in instance.disks:
3001
        self.cfg.SetDiskID(disk, node_current)
3002
      result = self.rpc.call_instance_reboot(node_current, instance,
3003
                                             reboot_type)
3004
      result.Raise("Could not reboot instance")
3005
    else:
3006
      result = self.rpc.call_instance_shutdown(node_current, instance)
3007
      result.Raise("Could not shutdown instance for full reboot")
3008
      _ShutdownInstanceDisks(self, instance)
3009
      _StartInstanceDisks(self, instance, ignore_secondaries)
3010
      result = self.rpc.call_instance_start(node_current, instance, None, None)
3011
      msg = result.fail_msg
3012
      if msg:
3013
        _ShutdownInstanceDisks(self, instance)
3014
        raise errors.OpExecError("Could not start instance for"
3015
                                 " full reboot: %s" % msg)
3016

    
3017
    self.cfg.MarkInstanceUp(instance.name)
3018

    
3019

    
3020
class LUShutdownInstance(LogicalUnit):
3021
  """Shutdown an instance.
3022

3023
  """
3024
  HPATH = "instance-stop"
3025
  HTYPE = constants.HTYPE_INSTANCE
3026
  _OP_REQP = ["instance_name"]
3027
  REQ_BGL = False
3028

    
3029
  def ExpandNames(self):
3030
    self._ExpandAndLockInstance()
3031

    
3032
  def BuildHooksEnv(self):
3033
    """Build hooks env.
3034

3035
    This runs on master, primary and secondary nodes of the instance.
3036

3037
    """
3038
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3039
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3040
    return env, nl, nl
3041

    
3042
  def CheckPrereq(self):
3043
    """Check prerequisites.
3044

3045
    This checks that the instance is in the cluster.
3046

3047
    """
3048
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3049
    assert self.instance is not None, \
3050
      "Cannot retrieve locked instance %s" % self.op.instance_name
3051
    _CheckNodeOnline(self, self.instance.primary_node)
3052

    
3053
  def Exec(self, feedback_fn):
3054
    """Shutdown the instance.
3055

3056
    """
3057
    instance = self.instance
3058
    node_current = instance.primary_node
3059
    self.cfg.MarkInstanceDown(instance.name)
3060
    result = self.rpc.call_instance_shutdown(node_current, instance)
3061
    msg = result.fail_msg
3062
    if msg:
3063
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3064

    
3065
    _ShutdownInstanceDisks(self, instance)
3066

    
3067

    
3068
class LUReinstallInstance(LogicalUnit):
3069
  """Reinstall an instance.
3070

3071
  """
3072
  HPATH = "instance-reinstall"
3073
  HTYPE = constants.HTYPE_INSTANCE
3074
  _OP_REQP = ["instance_name"]
3075
  REQ_BGL = False
3076

    
3077
  def ExpandNames(self):
3078
    self._ExpandAndLockInstance()
3079

    
3080
  def BuildHooksEnv(self):
3081
    """Build hooks env.
3082

3083
    This runs on master, primary and secondary nodes of the instance.
3084

3085
    """
3086
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3087
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3088
    return env, nl, nl
3089

    
3090
  def CheckPrereq(self):
3091
    """Check prerequisites.
3092

3093
    This checks that the instance is in the cluster and is not running.
3094

3095
    """
3096
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3097
    assert instance is not None, \
3098
      "Cannot retrieve locked instance %s" % self.op.instance_name
3099
    _CheckNodeOnline(self, instance.primary_node)
3100

    
3101
    if instance.disk_template == constants.DT_DISKLESS:
3102
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3103
                                 self.op.instance_name)
3104
    if instance.admin_up:
3105
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3106
                                 self.op.instance_name)
3107
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3108
                                              instance.name,
3109
                                              instance.hypervisor)
3110
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3111
                      prereq=True)
3112
    if remote_info.payload:
3113
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3114
                                 (self.op.instance_name,
3115
                                  instance.primary_node))
3116

    
3117
    self.op.os_type = getattr(self.op, "os_type", None)
3118
    if self.op.os_type is not None:
3119
      # OS verification
3120
      pnode = self.cfg.GetNodeInfo(
3121
        self.cfg.ExpandNodeName(instance.primary_node))
3122
      if pnode is None:
3123
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3124
                                   self.op.pnode)
3125
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3126
      result.Raise("OS '%s' not in supported OS list for primary node %s" %
3127
                   (self.op.os_type, pnode.name), prereq=True)
3128

    
3129
    self.instance = instance
3130

    
3131
  def Exec(self, feedback_fn):
3132
    """Reinstall the instance.
3133

3134
    """
3135
    inst = self.instance
3136

    
3137
    if self.op.os_type is not None:
3138
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3139
      inst.os = self.op.os_type
3140
      self.cfg.Update(inst)
3141

    
3142
    _StartInstanceDisks(self, inst, None)
3143
    try:
3144
      feedback_fn("Running the instance OS create scripts...")
3145
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3146
      result.Raise("Could not install OS for instance %s on node %s" %
3147
                   (inst.name, inst.primary_node))
3148
    finally:
3149
      _ShutdownInstanceDisks(self, inst)
3150

    
3151

    
3152
class LURenameInstance(LogicalUnit):
3153
  """Rename an instance.
3154

3155
  """
3156
  HPATH = "instance-rename"
3157
  HTYPE = constants.HTYPE_INSTANCE
3158
  _OP_REQP = ["instance_name", "new_name"]
3159

    
3160
  def BuildHooksEnv(self):
3161
    """Build hooks env.
3162

3163
    This runs on master, primary and secondary nodes of the instance.
3164

3165
    """
3166
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3167
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3168
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3169
    return env, nl, nl
3170

    
3171
  def CheckPrereq(self):
3172
    """Check prerequisites.
3173

3174
    This checks that the instance is in the cluster and is not running.
3175

3176
    """
3177
    instance = self.cfg.GetInstanceInfo(
3178
      self.cfg.ExpandInstanceName(self.op.instance_name))
3179
    if instance is None:
3180
      raise errors.OpPrereqError("Instance '%s' not known" %
3181
                                 self.op.instance_name)
3182
    _CheckNodeOnline(self, instance.primary_node)
3183

    
3184
    if instance.admin_up:
3185
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3186
                                 self.op.instance_name)
3187
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3188
                                              instance.name,
3189
                                              instance.hypervisor)
3190
    remote_info.Raise("Error checking node %s" % instance.primary_node,
3191
                      prereq=True)
3192
    if remote_info.payload:
3193
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3194
                                 (self.op.instance_name,
3195
                                  instance.primary_node))
3196
    self.instance = instance
3197

    
3198
    # new name verification
3199
    name_info = utils.HostInfo(self.op.new_name)
3200

    
3201
    self.op.new_name = new_name = name_info.name
3202
    instance_list = self.cfg.GetInstanceList()
3203
    if new_name in instance_list:
3204
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3205
                                 new_name)
3206

    
3207
    if not getattr(self.op, "ignore_ip", False):
3208
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3209
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3210
                                   (name_info.ip, new_name))
3211

    
3212

    
3213
  def Exec(self, feedback_fn):
3214
    """Reinstall the instance.
3215

3216
    """
3217
    inst = self.instance
3218
    old_name = inst.name
3219

    
3220
    if inst.disk_template == constants.DT_FILE:
3221
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3222

    
3223
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3224
    # Change the instance lock. This is definitely safe while we hold the BGL
3225
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3226
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3227

    
3228
    # re-read the instance from the configuration after rename
3229
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3230

    
3231
    if inst.disk_template == constants.DT_FILE:
3232
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3233
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3234
                                                     old_file_storage_dir,
3235
                                                     new_file_storage_dir)
3236
      result.Raise("Could not rename on node %s directory '%s' to '%s'"
3237
                   " (but the instance has been renamed in Ganeti)" %
3238
                   (inst.primary_node, old_file_storage_dir,
3239
                    new_file_storage_dir))
3240

    
3241
    _StartInstanceDisks(self, inst, None)
3242
    try:
3243
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3244
                                                 old_name)
3245
      msg = result.fail_msg
3246
      if msg:
3247
        msg = ("Could not run OS rename script for instance %s on node %s"
3248
               " (but the instance has been renamed in Ganeti): %s" %
3249
               (inst.name, inst.primary_node, msg))
3250
        self.proc.LogWarning(msg)
3251
    finally:
3252
      _ShutdownInstanceDisks(self, inst)
3253

    
3254

    
3255
class LURemoveInstance(LogicalUnit):
3256
  """Remove an instance.
3257

3258
  """
3259
  HPATH = "instance-remove"
3260
  HTYPE = constants.HTYPE_INSTANCE
3261
  _OP_REQP = ["instance_name", "ignore_failures"]
3262
  REQ_BGL = False
3263

    
3264
  def ExpandNames(self):
3265
    self._ExpandAndLockInstance()
3266
    self.needed_locks[locking.LEVEL_NODE] = []
3267
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3268

    
3269
  def DeclareLocks(self, level):
3270
    if level == locking.LEVEL_NODE:
3271
      self._LockInstancesNodes()
3272

    
3273
  def BuildHooksEnv(self):
3274
    """Build hooks env.
3275

3276
    This runs on master, primary and secondary nodes of the instance.
3277

3278
    """
3279
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3280
    nl = [self.cfg.GetMasterNode()]
3281
    return env, nl, nl
3282

    
3283
  def CheckPrereq(self):
3284
    """Check prerequisites.
3285

3286
    This checks that the instance is in the cluster.
3287

3288
    """
3289
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3290
    assert self.instance is not None, \
3291
      "Cannot retrieve locked instance %s" % self.op.instance_name
3292

    
3293
  def Exec(self, feedback_fn):
3294
    """Remove the instance.
3295

3296
    """
3297
    instance = self.instance
3298
    logging.info("Shutting down instance %s on node %s",
3299
                 instance.name, instance.primary_node)
3300

    
3301
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3302
    msg = result.fail_msg
3303
    if msg:
3304
      if self.op.ignore_failures:
3305
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3306
      else:
3307
        raise errors.OpExecError("Could not shutdown instance %s on"
3308
                                 " node %s: %s" %
3309
                                 (instance.name, instance.primary_node, msg))
3310

    
3311
    logging.info("Removing block devices for instance %s", instance.name)
3312

    
3313
    if not _RemoveDisks(self, instance):
3314
      if self.op.ignore_failures:
3315
        feedback_fn("Warning: can't remove instance's disks")
3316
      else:
3317
        raise errors.OpExecError("Can't remove instance's disks")
3318

    
3319
    logging.info("Removing instance %s out of cluster config", instance.name)
3320

    
3321
    self.cfg.RemoveInstance(instance.name)
3322
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3323

    
3324

    
3325
class LUQueryInstances(NoHooksLU):
3326
  """Logical unit for querying instances.
3327

3328
  """
3329
  _OP_REQP = ["output_fields", "names", "use_locking"]
3330
  REQ_BGL = False
3331
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3332
                                    "admin_state",
3333
                                    "disk_template", "ip", "mac", "bridge",
3334
                                    "sda_size", "sdb_size", "vcpus", "tags",
3335
                                    "network_port", "beparams",
3336
                                    r"(disk)\.(size)/([0-9]+)",
3337
                                    r"(disk)\.(sizes)", "disk_usage",
3338
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3339
                                    r"(nic)\.(macs|ips|bridges)",
3340
                                    r"(disk|nic)\.(count)",
3341
                                    "serial_no", "hypervisor", "hvparams",] +
3342
                                  ["hv/%s" % name
3343
                                   for name in constants.HVS_PARAMETERS] +
3344
                                  ["be/%s" % name
3345
                                   for name in constants.BES_PARAMETERS])
3346
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3347

    
3348

    
3349
  def ExpandNames(self):
3350
    _CheckOutputFields(static=self._FIELDS_STATIC,
3351
                       dynamic=self._FIELDS_DYNAMIC,
3352
                       selected=self.op.output_fields)
3353

    
3354
    self.needed_locks = {}
3355
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3356
    self.share_locks[locking.LEVEL_NODE] = 1
3357

    
3358
    if self.op.names:
3359
      self.wanted = _GetWantedInstances(self, self.op.names)
3360
    else:
3361
      self.wanted = locking.ALL_SET
3362

    
3363
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3364
    self.do_locking = self.do_node_query and self.op.use_locking
3365
    if self.do_locking:
3366
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3367
      self.needed_locks[locking.LEVEL_NODE] = []
3368
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3369

    
3370
  def DeclareLocks(self, level):
3371
    if level == locking.LEVEL_NODE and self.do_locking:
3372
      self._LockInstancesNodes()
3373

    
3374
  def CheckPrereq(self):
3375
    """Check prerequisites.
3376

3377
    """
3378
    pass
3379

    
3380
  def Exec(self, feedback_fn):
3381
    """Computes the list of nodes and their attributes.
3382

3383
    """
3384
    all_info = self.cfg.GetAllInstancesInfo()
3385
    if self.wanted == locking.ALL_SET:
3386
      # caller didn't specify instance names, so ordering is not important
3387
      if self.do_locking:
3388
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3389
      else:
3390
        instance_names = all_info.keys()
3391
      instance_names = utils.NiceSort(instance_names)
3392
    else:
3393
      # caller did specify names, so we must keep the ordering
3394
      if self.do_locking:
3395
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3396
      else:
3397
        tgt_set = all_info.keys()
3398
      missing = set(self.wanted).difference(tgt_set)
3399
      if missing:
3400
        raise errors.OpExecError("Some instances were removed before"
3401
                                 " retrieving their data: %s" % missing)
3402
      instance_names = self.wanted
3403

    
3404
    instance_list = [all_info[iname] for iname in instance_names]
3405

    
3406
    # begin data gathering
3407

    
3408
    nodes = frozenset([inst.primary_node for inst in instance_list])
3409
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3410

    
3411
    bad_nodes = []
3412
    off_nodes = []
3413
    if self.do_node_query:
3414
      live_data = {}
3415
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3416
      for name in nodes:
3417
        result = node_data[name]
3418
        if result.offline:
3419
          # offline nodes will be in both lists
3420
          off_nodes.append(name)
3421
        if result.failed or result.fail_msg:
3422
          bad_nodes.append(name)
3423
        else:
3424
          if result.payload:
3425
            live_data.update(result.payload)
3426
          # else no instance is alive
3427
    else:
3428
      live_data = dict([(name, {}) for name in instance_names])
3429

    
3430
    # end data gathering
3431

    
3432
    HVPREFIX = "hv/"
3433
    BEPREFIX = "be/"
3434
    output = []
3435
    for instance in instance_list:
3436
      iout = []
3437
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3438
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3439
      for field in self.op.output_fields:
3440
        st_match = self._FIELDS_STATIC.Matches(field)
3441
        if field == "name":
3442
          val = instance.name
3443
        elif field == "os":
3444
          val = instance.os
3445
        elif field == "pnode":
3446
          val = instance.primary_node
3447
        elif field == "snodes":
3448
          val = list(instance.secondary_nodes)
3449
        elif field == "admin_state":
3450
          val = instance.admin_up
3451
        elif field == "oper_state":
3452
          if instance.primary_node in bad_nodes:
3453
            val = None
3454
          else:
3455
            val = bool(live_data.get(instance.name))
3456
        elif field == "status":
3457
          if instance.primary_node in off_nodes:
3458
            val = "ERROR_nodeoffline"
3459
          elif instance.primary_node in bad_nodes:
3460
            val = "ERROR_nodedown"
3461
          else:
3462
            running = bool(live_data.get(instance.name))
3463
            if running:
3464
              if instance.admin_up:
3465
                val = "running"
3466
              else:
3467
                val = "ERROR_up"
3468
            else:
3469
              if instance.admin_up:
3470
                val = "ERROR_down"
3471
              else:
3472
                val = "ADMIN_down"
3473
        elif field == "oper_ram":
3474
          if instance.primary_node in bad_nodes:
3475
            val = None
3476
          elif instance.name in live_data:
3477
            val = live_data[instance.name].get("memory", "?")
3478
          else:
3479
            val = "-"
3480
        elif field == "disk_template":
3481
          val = instance.disk_template
3482
        elif field == "ip":
3483
          val = instance.nics[0].ip
3484
        elif field == "bridge":
3485
          val = instance.nics[0].bridge
3486
        elif field == "mac":
3487
          val = instance.nics[0].mac
3488
        elif field == "sda_size" or field == "sdb_size":
3489
          idx = ord(field[2]) - ord('a')
3490
          try:
3491
            val = instance.FindDisk(idx).size
3492
          except errors.OpPrereqError:
3493
            val = None
3494
        elif field == "disk_usage": # total disk usage per node
3495
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3496
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3497
        elif field == "tags":
3498
          val = list(instance.GetTags())
3499
        elif field == "serial_no":
3500
          val = instance.serial_no
3501
        elif field == "network_port":
3502
          val = instance.network_port
3503
        elif field == "hypervisor":
3504
          val = instance.hypervisor
3505
        elif field == "hvparams":
3506
          val = i_hv
3507
        elif (field.startswith(HVPREFIX) and
3508
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3509
          val = i_hv.get(field[len(HVPREFIX):], None)
3510
        elif field == "beparams":
3511
          val = i_be
3512
        elif (field.startswith(BEPREFIX) and
3513
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3514
          val = i_be.get(field[len(BEPREFIX):], None)
3515
        elif st_match and st_match.groups():
3516
          # matches a variable list
3517
          st_groups = st_match.groups()
3518
          if st_groups and st_groups[0] == "disk":
3519
            if st_groups[1] == "count":
3520
              val = len(instance.disks)
3521
            elif st_groups[1] == "sizes":
3522
              val = [disk.size for disk in instance.disks]
3523
            elif st_groups[1] == "size":
3524
              try:
3525
                val = instance.FindDisk(st_groups[2]).size
3526
              except errors.OpPrereqError:
3527
                val = None
3528
            else:
3529
              assert False, "Unhandled disk parameter"
3530
          elif st_groups[0] == "nic":
3531
            if st_groups[1] == "count":
3532
              val = len(instance.nics)
3533
            elif st_groups[1] == "macs":
3534
              val = [nic.mac for nic in instance.nics]
3535
            elif st_groups[1] == "ips":
3536
              val = [nic.ip for nic in instance.nics]
3537
            elif st_groups[1] == "bridges":
3538
              val = [nic.bridge for nic in instance.nics]
3539
            else:
3540
              # index-based item
3541
              nic_idx = int(st_groups[2])
3542
              if nic_idx >= len(instance.nics):
3543
                val = None
3544
              else:
3545
                if st_groups[1] == "mac":
3546
                  val = instance.nics[nic_idx].mac
3547
                elif st_groups[1] == "ip":
3548
                  val = instance.nics[nic_idx].ip
3549
                elif st_groups[1] == "bridge":
3550
                  val = instance.nics[nic_idx].bridge
3551
                else:
3552
                  assert False, "Unhandled NIC parameter"
3553
          else:
3554
            assert False, "Unhandled variable parameter"
3555
        else:
3556
          raise errors.ParameterError(field)
3557
        iout.append(val)
3558
      output.append(iout)
3559

    
3560
    return output
3561

    
3562

    
3563
class LUFailoverInstance(LogicalUnit):
3564
  """Failover an instance.
3565

3566
  """
3567
  HPATH = "instance-failover"
3568
  HTYPE = constants.HTYPE_INSTANCE
3569
  _OP_REQP = ["instance_name", "ignore_consistency"]
3570
  REQ_BGL = False
3571

    
3572
  def ExpandNames(self):
3573
    self._ExpandAndLockInstance()
3574
    self.needed_locks[locking.LEVEL_NODE] = []
3575
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3576

    
3577
  def DeclareLocks(self, level):
3578
    if level == locking.LEVEL_NODE:
3579
      self._LockInstancesNodes()
3580

    
3581
  def BuildHooksEnv(self):
3582
    """Build hooks env.
3583

3584
    This runs on master, primary and secondary nodes of the instance.
3585

3586
    """
3587
    env = {
3588
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3589
      }
3590
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3591
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3592
    return env, nl, nl
3593

    
3594
  def CheckPrereq(self):
3595
    """Check prerequisites.
3596

3597
    This checks that the instance is in the cluster.
3598

3599
    """
3600
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3601
    assert self.instance is not None, \
3602
      "Cannot retrieve locked instance %s" % self.op.instance_name
3603

    
3604
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3605
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3606
      raise errors.OpPrereqError("Instance's disk layout is not"
3607
                                 " network mirrored, cannot failover.")
3608

    
3609
    secondary_nodes = instance.secondary_nodes
3610
    if not secondary_nodes:
3611
      raise errors.ProgrammerError("no secondary node but using "
3612
                                   "a mirrored disk template")
3613

    
3614
    target_node = secondary_nodes[0]
3615
    _CheckNodeOnline(self, target_node)
3616
    _CheckNodeNotDrained(self, target_node)
3617
    # check memory requirements on the secondary node
3618
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3619
                         instance.name, bep[constants.BE_MEMORY],
3620
                         instance.hypervisor)
3621
    # check bridge existance
3622
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3623

    
3624
  def Exec(self, feedback_fn):
3625
    """Failover an instance.
3626

3627
    The failover is done by shutting it down on its present node and
3628
    starting it on the secondary.
3629

3630
    """
3631
    instance = self.instance
3632

    
3633
    source_node = instance.primary_node
3634
    target_node = instance.secondary_nodes[0]
3635

    
3636
    feedback_fn("* checking disk consistency between source and target")
3637
    for dev in instance.disks:
3638
      # for drbd, these are drbd over lvm
3639
      if not _CheckDiskConsistency(self, dev, target_node, False):
3640
        if instance.admin_up and not self.op.ignore_consistency:
3641
          raise errors.OpExecError("Disk %s is degraded on target node,"
3642
                                   " aborting failover." % dev.iv_name)
3643

    
3644
    feedback_fn("* shutting down instance on source node")
3645
    logging.info("Shutting down instance %s on node %s",
3646
                 instance.name, source_node)
3647

    
3648
    result = self.rpc.call_instance_shutdown(source_node, instance)
3649
    msg = result.fail_msg
3650
    if msg:
3651
      if self.op.ignore_consistency:
3652
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3653
                             " Proceeding anyway. Please make sure node"
3654
                             " %s is down. Error details: %s",
3655
                             instance.name, source_node, source_node, msg)
3656
      else:
3657
        raise errors.OpExecError("Could not shutdown instance %s on"
3658
                                 " node %s: %s" %
3659
                                 (instance.name, source_node, msg))
3660

    
3661
    feedback_fn("* deactivating the instance's disks on source node")
3662
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3663
      raise errors.OpExecError("Can't shut down the instance's disks.")
3664

    
3665
    instance.primary_node = target_node
3666
    # distribute new instance config to the other nodes
3667
    self.cfg.Update(instance)
3668

    
3669
    # Only start the instance if it's marked as up
3670
    if instance.admin_up:
3671
      feedback_fn("* activating the instance's disks on target node")
3672
      logging.info("Starting instance %s on node %s",
3673
                   instance.name, target_node)
3674

    
3675
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3676
                                               ignore_secondaries=True)
3677
      if not disks_ok:
3678
        _ShutdownInstanceDisks(self, instance)
3679
        raise errors.OpExecError("Can't activate the instance's disks")
3680

    
3681
      feedback_fn("* starting the instance on the target node")
3682
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3683
      msg = result.fail_msg
3684
      if msg:
3685
        _ShutdownInstanceDisks(self, instance)
3686
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3687
                                 (instance.name, target_node, msg))
3688

    
3689

    
3690
class LUMigrateInstance(LogicalUnit):
3691
  """Migrate an instance.
3692

3693
  This is migration without shutting down, compared to the failover,
3694
  which is done with shutdown.
3695

3696
  """
3697
  HPATH = "instance-migrate"
3698
  HTYPE = constants.HTYPE_INSTANCE
3699
  _OP_REQP = ["instance_name", "live", "cleanup"]
3700

    
3701
  REQ_BGL = False
3702

    
3703
  def ExpandNames(self):
3704
    self._ExpandAndLockInstance()
3705
    self.needed_locks[locking.LEVEL_NODE] = []
3706
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3707

    
3708
  def DeclareLocks(self, level):
3709
    if level == locking.LEVEL_NODE:
3710
      self._LockInstancesNodes()
3711

    
3712
  def BuildHooksEnv(self):
3713
    """Build hooks env.
3714

3715
    This runs on master, primary and secondary nodes of the instance.
3716

3717
    """
3718
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3719
    env["MIGRATE_LIVE"] = self.op.live
3720
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3721
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3722
    return env, nl, nl
3723

    
3724
  def CheckPrereq(self):
3725
    """Check prerequisites.
3726

3727
    This checks that the instance is in the cluster.
3728

3729
    """
3730
    instance = self.cfg.GetInstanceInfo(
3731
      self.cfg.ExpandInstanceName(self.op.instance_name))
3732
    if instance is None:
3733
      raise errors.OpPrereqError("Instance '%s' not known" %
3734
                                 self.op.instance_name)
3735

    
3736
    if instance.disk_template != constants.DT_DRBD8:
3737
      raise errors.OpPrereqError("Instance's disk layout is not"
3738
                                 " drbd8, cannot migrate.")
3739

    
3740
    secondary_nodes = instance.secondary_nodes
3741
    if not secondary_nodes:
3742
      raise errors.ConfigurationError("No secondary node but using"
3743
                                      " drbd8 disk template")
3744

    
3745
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3746

    
3747
    target_node = secondary_nodes[0]
3748
    # check memory requirements on the secondary node
3749
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3750
                         instance.name, i_be[constants.BE_MEMORY],
3751
                         instance.hypervisor)
3752

    
3753
    # check bridge existance
3754
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3755

    
3756
    if not self.op.cleanup:
3757
      _CheckNodeNotDrained(self, target_node)
3758
      result = self.rpc.call_instance_migratable(instance.primary_node,
3759
                                                 instance)
3760
      result.Raise("Can't migrate, please use failover", prereq=True)
3761

    
3762
    self.instance = instance
3763

    
3764
  def _WaitUntilSync(self):
3765
    """Poll with custom rpc for disk sync.
3766

3767
    This uses our own step-based rpc call.
3768

3769
    """
3770
    self.feedback_fn("* wait until resync is done")
3771
    all_done = False
3772
    while not all_done:
3773
      all_done = True
3774
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3775
                                            self.nodes_ip,
3776
                                            self.instance.disks)
3777
      min_percent = 100
3778
      for node, nres in result.items():
3779
        nres.Raise("Cannot resync disks on node %s" % node)
3780
        node_done, node_percent = nres.payload
3781
        all_done = all_done and node_done
3782
        if node_percent is not None:
3783
          min_percent = min(min_percent, node_percent)
3784
      if not all_done:
3785
        if min_percent < 100:
3786
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3787
        time.sleep(2)
3788

    
3789
  def _EnsureSecondary(self, node):
3790
    """Demote a node to secondary.
3791

3792
    """
3793
    self.feedback_fn("* switching node %s to secondary mode" % node)
3794

    
3795
    for dev in self.instance.disks:
3796
      self.cfg.SetDiskID(dev, node)
3797

    
3798
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3799
                                          self.instance.disks)
3800
    result.Raise("Cannot change disk to secondary on node %s" % node)
3801

    
3802
  def _GoStandalone(self):
3803
    """Disconnect from the network.
3804

3805
    """
3806
    self.feedback_fn("* changing into standalone mode")
3807
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3808
                                               self.instance.disks)
3809
    for node, nres in result.items():
3810
      nres.Raise("Cannot disconnect disks node %s" % node)
3811

    
3812
  def _GoReconnect(self, multimaster):
3813
    """Reconnect to the network.
3814

3815
    """
3816
    if multimaster:
3817
      msg = "dual-master"
3818
    else:
3819
      msg = "single-master"
3820
    self.feedback_fn("* changing disks into %s mode" % msg)
3821
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3822
                                           self.instance.disks,
3823
                                           self.instance.name, multimaster)
3824
    for node, nres in result.items():
3825
      nres.Raise("Cannot change disks config on node %s" % node)
3826

    
3827
  def _ExecCleanup(self):
3828
    """Try to cleanup after a failed migration.
3829

3830
    The cleanup is done by:
3831
      - check that the instance is running only on one node
3832
        (and update the config if needed)
3833
      - change disks on its secondary node to secondary
3834
      - wait until disks are fully synchronized
3835
      - disconnect from the network
3836
      - change disks into single-master mode
3837
      - wait again until disks are fully synchronized
3838

3839
    """
3840
    instance = self.instance
3841
    target_node = self.target_node
3842
    source_node = self.source_node
3843

    
3844
    # check running on only one node
3845
    self.feedback_fn("* checking where the instance actually runs"
3846
                     " (if this hangs, the hypervisor might be in"
3847
                     " a bad state)")
3848
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3849
    for node, result in ins_l.items():
3850
      result.Raise("Can't contact node %s" % node)
3851

    
3852
    runningon_source = instance.name in ins_l[source_node].payload
3853
    runningon_target = instance.name in ins_l[target_node].payload
3854

    
3855
    if runningon_source and runningon_target:
3856
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3857
                               " or the hypervisor is confused. You will have"
3858
                               " to ensure manually that it runs only on one"
3859
                               " and restart this operation.")
3860

    
3861
    if not (runningon_source or runningon_target):
3862
      raise errors.OpExecError("Instance does not seem to be running at all."
3863
                               " In this case, it's safer to repair by"
3864
                               " running 'gnt-instance stop' to ensure disk"
3865
                               " shutdown, and then restarting it.")
3866

    
3867
    if runningon_target:
3868
      # the migration has actually succeeded, we need to update the config
3869
      self.feedback_fn("* instance running on secondary node (%s),"
3870
                       " updating config" % target_node)
3871
      instance.primary_node = target_node
3872
      self.cfg.Update(instance)
3873
      demoted_node = source_node
3874
    else:
3875
      self.feedback_fn("* instance confirmed to be running on its"
3876
                       " primary node (%s)" % source_node)
3877
      demoted_node = target_node
3878

    
3879
    self._EnsureSecondary(demoted_node)
3880
    try:
3881
      self._WaitUntilSync()
3882
    except errors.OpExecError:
3883
      # we ignore here errors, since if the device is standalone, it
3884
      # won't be able to sync
3885
      pass
3886
    self._GoStandalone()
3887
    self._GoReconnect(False)
3888
    self._WaitUntilSync()
3889

    
3890
    self.feedback_fn("* done")
3891

    
3892
  def _RevertDiskStatus(self):
3893
    """Try to revert the disk status after a failed migration.
3894

3895
    """
3896
    target_node = self.target_node
3897
    try:
3898
      self._EnsureSecondary(target_node)
3899
      self._GoStandalone()
3900
      self._GoReconnect(False)
3901
      self._WaitUntilSync()
3902
    except errors.OpExecError, err:
3903
      self.LogWarning("Migration failed and I can't reconnect the"
3904
                      " drives: error '%s'\n"
3905
                      "Please look and recover the instance status" %
3906
                      str(err))
3907

    
3908
  def _AbortMigration(self):
3909
    """Call the hypervisor code to abort a started migration.
3910

3911
    """
3912
    instance = self.instance
3913
    target_node = self.target_node
3914
    migration_info = self.migration_info
3915

    
3916
    abort_result = self.rpc.call_finalize_migration(target_node,
3917
                                                    instance,
3918
                                                    migration_info,
3919
                                                    False)
3920
    abort_msg = abort_result.fail_msg
3921
    if abort_msg:
3922
      logging.error("Aborting migration failed on target node %s: %s" %
3923
                    (target_node, abort_msg))
3924
      # Don't raise an exception here, as we stil have to try to revert the
3925
      # disk status, even if this step failed.
3926

    
3927
  def _ExecMigration(self):
3928
    """Migrate an instance.
3929

3930
    The migrate is done by:
3931
      - change the disks into dual-master mode
3932
      - wait until disks are fully synchronized again
3933
      - migrate the instance
3934
      - change disks on the new secondary node (the old primary) to secondary
3935
      - wait until disks are fully synchronized
3936
      - change disks into single-master mode
3937

3938
    """
3939
    instance = self.instance
3940
    target_node = self.target_node
3941
    source_node = self.source_node
3942

    
3943
    self.feedback_fn("* checking disk consistency between source and target")
3944
    for dev in instance.disks:
3945
      if not _CheckDiskConsistency(self, dev, target_node, False):
3946
        raise errors.OpExecError("Disk %s is degraded or not fully"
3947
                                 " synchronized on target node,"
3948
                                 " aborting migrate." % dev.iv_name)
3949

    
3950
    # First get the migration information from the remote node
3951
    result = self.rpc.call_migration_info(source_node, instance)
3952
    msg = result.fail_msg
3953
    if msg:
3954
      log_err = ("Failed fetching source migration information from %s: %s" %
3955
                 (source_node, msg))
3956
      logging.error(log_err)
3957
      raise errors.OpExecError(log_err)
3958

    
3959
    self.migration_info = migration_info = result.payload
3960

    
3961
    # Then switch the disks to master/master mode
3962
    self._EnsureSecondary(target_node)
3963
    self._GoStandalone()
3964
    self._GoReconnect(True)
3965
    self._WaitUntilSync()
3966

    
3967
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3968
    result = self.rpc.call_accept_instance(target_node,
3969
                                           instance,
3970
                                           migration_info,
3971
                                           self.nodes_ip[target_node])
3972

    
3973
    msg = result.fail_msg
3974
    if msg:
3975
      logging.error("Instance pre-migration failed, trying to revert"
3976
                    " disk status: %s", msg)
3977
      self._AbortMigration()
3978
      self._RevertDiskStatus()
3979
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3980
                               (instance.name, msg))
3981

    
3982
    self.feedback_fn("* migrating instance to %s" % target_node)
3983
    time.sleep(10)
3984
    result = self.rpc.call_instance_migrate(source_node, instance,
3985
                                            self.nodes_ip[target_node],
3986
                                            self.op.live)
3987
    msg = result.fail_msg
3988
    if msg:
3989
      logging.error("Instance migration failed, trying to revert"
3990
                    " disk status: %s", msg)
3991
      self._AbortMigration()
3992
      self._RevertDiskStatus()
3993
      raise errors.OpExecError("Could not migrate instance %s: %s" %
3994
                               (instance.name, msg))
3995
    time.sleep(10)
3996

    
3997
    instance.primary_node = target_node
3998
    # distribute new instance config to the other nodes
3999
    self.cfg.Update(instance)
4000

    
4001
    result = self.rpc.call_finalize_migration(target_node,
4002
                                              instance,
4003
                                              migration_info,
4004
                                              True)
4005
    msg = result.fail_msg
4006
    if msg:
4007
      logging.error("Instance migration succeeded, but finalization failed:"
4008
                    " %s" % msg)
4009
      raise errors.OpExecError("Could not finalize instance migration: %s" %
4010
                               msg)
4011

    
4012
    self._EnsureSecondary(source_node)
4013
    self._WaitUntilSync()
4014
    self._GoStandalone()
4015
    self._GoReconnect(False)
4016
    self._WaitUntilSync()
4017

    
4018
    self.feedback_fn("* done")
4019

    
4020
  def Exec(self, feedback_fn):
4021
    """Perform the migration.
4022

4023
    """
4024
    self.feedback_fn = feedback_fn
4025

    
4026
    self.source_node = self.instance.primary_node
4027
    self.target_node = self.instance.secondary_nodes[0]
4028
    self.all_nodes = [self.source_node, self.target_node]
4029
    self.nodes_ip = {
4030
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4031
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4032
      }
4033
    if self.op.cleanup:
4034
      return self._ExecCleanup()
4035
    else:
4036
      return self._ExecMigration()
4037

    
4038

    
4039
def _CreateBlockDev(lu, node, instance, device, force_create,
4040
                    info, force_open):
4041
  """Create a tree of block devices on a given node.
4042

4043
  If this device type has to be created on secondaries, create it and
4044
  all its children.
4045

4046
  If not, just recurse to children keeping the same 'force' value.
4047

4048
  @param lu: the lu on whose behalf we execute
4049
  @param node: the node on which to create the device
4050
  @type instance: L{objects.Instance}
4051
  @param instance: the instance which owns the device
4052
  @type device: L{objects.Disk}
4053
  @param device: the device to create
4054
  @type force_create: boolean
4055
  @param force_create: whether to force creation of this device; this
4056
      will be change to True whenever we find a device which has
4057
      CreateOnSecondary() attribute
4058
  @param info: the extra 'metadata' we should attach to the device
4059
      (this will be represented as a LVM tag)
4060
  @type force_open: boolean
4061
  @param force_open: this parameter will be passes to the
4062
      L{backend.BlockdevCreate} function where it specifies
4063
      whether we run on primary or not, and it affects both
4064
      the child assembly and the device own Open() execution
4065

4066
  """
4067
  if device.CreateOnSecondary():
4068
    force_create = True
4069

    
4070
  if device.children:
4071
    for child in device.children:
4072
      _CreateBlockDev(lu, node, instance, child, force_create,
4073
                      info, force_open)
4074

    
4075
  if not force_create:
4076
    return
4077

    
4078
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4079

    
4080

    
4081
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4082
  """Create a single block device on a given node.
4083

4084
  This will not recurse over children of the device, so they must be
4085
  created in advance.
4086

4087
  @param lu: the lu on whose behalf we execute
4088
  @param node: the node on which to create the device
4089
  @type instance: L{objects.Instance}
4090
  @param instance: the instance which owns the device
4091
  @type device: L{objects.Disk}
4092
  @param device: the device to create
4093
  @param info: the extra 'metadata' we should attach to the device
4094
      (this will be represented as a LVM tag)
4095
  @type force_open: boolean
4096
  @param force_open: this parameter will be passes to the
4097
      L{backend.BlockdevCreate} function where it specifies
4098
      whether we run on primary or not, and it affects both
4099
      the child assembly and the device own Open() execution
4100

4101
  """
4102
  lu.cfg.SetDiskID(device, node)
4103
  result = lu.rpc.call_blockdev_create(node, device, device.size,
4104
                                       instance.name, force_open, info)
4105
  result.Raise("Can't create block device %s on"
4106
               " node %s for instance %s" % (device, node, instance.name))
4107
  if device.physical_id is None:
4108
    device.physical_id = result.payload
4109

    
4110

    
4111
def _GenerateUniqueNames(lu, exts):
4112
  """Generate a suitable LV name.
4113

4114
  This will generate a logical volume name for the given instance.
4115

4116
  """
4117
  results = []
4118
  for val in exts:
4119
    new_id = lu.cfg.GenerateUniqueID()
4120
    results.append("%s%s" % (new_id, val))
4121
  return results
4122

    
4123

    
4124
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4125
                         p_minor, s_minor):
4126
  """Generate a drbd8 device complete with its children.
4127

4128
  """
4129
  port = lu.cfg.AllocatePort()
4130
  vgname = lu.cfg.GetVGName()
4131
  shared_secret = lu.cfg.GenerateDRBDSecret()
4132
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4133
                          logical_id=(vgname, names[0]))
4134
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4135
                          logical_id=(vgname, names[1]))
4136
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4137
                          logical_id=(primary, secondary, port,
4138
                                      p_minor, s_minor,
4139
                                      shared_secret),
4140
                          children=[dev_data, dev_meta],
4141
                          iv_name=iv_name)
4142
  return drbd_dev
4143

    
4144

    
4145
def _GenerateDiskTemplate(lu, template_name,
4146
                          instance_name, primary_node,
4147
                          secondary_nodes, disk_info,
4148
                          file_storage_dir, file_driver,
4149
                          base_index):
4150
  """Generate the entire disk layout for a given template type.
4151

4152
  """
4153
  #TODO: compute space requirements
4154

    
4155
  vgname = lu.cfg.GetVGName()
4156
  disk_count = len(disk_info)
4157
  disks = []
4158
  if template_name == constants.DT_DISKLESS:
4159
    pass
4160
  elif template_name == constants.DT_PLAIN:
4161
    if len(secondary_nodes) != 0:
4162
      raise errors.ProgrammerError("Wrong template configuration")
4163

    
4164
    names = _GenerateUniqueNames(lu, [".disk%d" % i
4165
                                      for i in range(disk_count)])
4166
    for idx, disk in enumerate(disk_info):
4167
      disk_index = idx + base_index
4168
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4169
                              logical_id=(vgname, names[idx]),
4170
                              iv_name="disk/%d" % disk_index,
4171
                              mode=disk["mode"])
4172
      disks.append(disk_dev)
4173
  elif template_name == constants.DT_DRBD8:
4174
    if len(secondary_nodes) != 1:
4175
      raise errors.ProgrammerError("Wrong template configuration")
4176
    remote_node = secondary_nodes[0]
4177
    minors = lu.cfg.AllocateDRBDMinor(
4178
      [primary_node, remote_node] * len(disk_info), instance_name)
4179

    
4180
    names = []
4181
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4182
                                               for i in range(disk_count)]):
4183
      names.append(lv_prefix + "_data")
4184
      names.append(lv_prefix + "_meta")
4185
    for idx, disk in enumerate(disk_info):
4186
      disk_index = idx + base_index
4187
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4188
                                      disk["size"], names[idx*2:idx*2+2],
4189
                                      "disk/%d" % disk_index,
4190
                                      minors[idx*2], minors[idx*2+1])
4191
      disk_dev.mode = disk["mode"]
4192
      disks.append(disk_dev)
4193
  elif template_name == constants.DT_FILE:
4194
    if len(secondary_nodes) != 0:
4195
      raise errors.ProgrammerError("Wrong template configuration")
4196

    
4197
    for idx, disk in enumerate(disk_info):
4198
      disk_index = idx + base_index
4199
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4200
                              iv_name="disk/%d" % disk_index,
4201
                              logical_id=(file_driver,
4202
                                          "%s/disk%d" % (file_storage_dir,
4203
                                                         disk_index)),
4204
                              mode=disk["mode"])
4205
      disks.append(disk_dev)
4206
  else:
4207
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4208
  return disks
4209

    
4210

    
4211
def _GetInstanceInfoText(instance):
4212
  """Compute that text that should be added to the disk's metadata.
4213

4214
  """
4215
  return "originstname+%s" % instance.name
4216

    
4217

    
4218
def _CreateDisks(lu, instance):
4219
  """Create all disks for an instance.
4220

4221
  This abstracts away some work from AddInstance.
4222

4223
  @type lu: L{LogicalUnit}
4224
  @param lu: the logical unit on whose behalf we execute
4225
  @type instance: L{objects.Instance}
4226
  @param instance: the instance whose disks we should create
4227
  @rtype: boolean
4228
  @return: the success of the creation
4229

4230
  """
4231
  info = _GetInstanceInfoText(instance)
4232
  pnode = instance.primary_node
4233

    
4234
  if instance.disk_template == constants.DT_FILE:
4235
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4236
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4237

    
4238
    result.Raise("Failed to create directory '%s' on"
4239
                 " node %s: %s" % (file_storage_dir, pnode))
4240

    
4241
  # Note: this needs to be kept in sync with adding of disks in
4242
  # LUSetInstanceParams
4243
  for device in instance.disks:
4244
    logging.info("Creating volume %s for instance %s",
4245
                 device.iv_name, instance.name)
4246
    #HARDCODE
4247
    for node in instance.all_nodes:
4248
      f_create = node == pnode
4249
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4250

    
4251

    
4252
def _RemoveDisks(lu, instance):
4253
  """Remove all disks for an instance.
4254

4255
  This abstracts away some work from `AddInstance()` and
4256
  `RemoveInstance()`. Note that in case some of the devices couldn't
4257
  be removed, the removal will continue with the other ones (compare
4258
  with `_CreateDisks()`).
4259

4260
  @type lu: L{LogicalUnit}
4261
  @param lu: the logical unit on whose behalf we execute
4262
  @type instance: L{objects.Instance}
4263
  @param instance: the instance whose disks we should remove
4264
  @rtype: boolean
4265
  @return: the success of the removal
4266

4267
  """
4268
  logging.info("Removing block devices for instance %s", instance.name)
4269

    
4270
  all_result = True
4271
  for device in instance.disks:
4272
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4273
      lu.cfg.SetDiskID(disk, node)
4274
      msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4275
      if msg:
4276
        lu.LogWarning("Could not remove block device %s on node %s,"
4277
                      " continuing anyway: %s", device.iv_name, node, msg)
4278
        all_result = False
4279

    
4280
  if instance.disk_template == constants.DT_FILE:
4281
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4282
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4283
                                                 file_storage_dir)
4284
    msg = result.fail_msg
4285
    if msg:
4286
      lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4287
                    file_storage_dir, instance.primary_node, msg)
4288
      all_result = False
4289

    
4290
  return all_result
4291

    
4292

    
4293
def _ComputeDiskSize(disk_template, disks):
4294
  """Compute disk size requirements in the volume group
4295

4296
  """
4297
  # Required free disk space as a function of disk and swap space
4298
  req_size_dict = {
4299
    constants.DT_DISKLESS: None,
4300
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4301
    # 128 MB are added for drbd metadata for each disk
4302
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4303
    constants.DT_FILE: None,
4304
  }
4305

    
4306
  if disk_template not in req_size_dict:
4307
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4308
                                 " is unknown" %  disk_template)
4309

    
4310
  return req_size_dict[disk_template]
4311

    
4312

    
4313
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4314
  """Hypervisor parameter validation.
4315

4316
  This function abstract the hypervisor parameter validation to be
4317
  used in both instance create and instance modify.
4318

4319
  @type lu: L{LogicalUnit}
4320
  @param lu: the logical unit for which we check
4321
  @type nodenames: list
4322
  @param nodenames: the list of nodes on which we should check
4323
  @type hvname: string
4324
  @param hvname: the name of the hypervisor we should use
4325
  @type hvparams: dict
4326
  @param hvparams: the parameters which we need to check
4327
  @raise errors.OpPrereqError: if the parameters are not valid
4328

4329
  """
4330
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4331
                                                  hvname,
4332
                                                  hvparams)
4333
  for node in nodenames:
4334
    info = hvinfo[node]
4335
    if info.offline:
4336
      continue
4337
    info.Raise("Hypervisor parameter validation failed on node %s" % node)
4338

    
4339

    
4340
class LUCreateInstance(LogicalUnit):
4341
  """Create an instance.
4342

4343
  """
4344
  HPATH = "instance-add"
4345
  HTYPE = constants.HTYPE_INSTANCE
4346
  _OP_REQP = ["instance_name", "disks", "disk_template",
4347
              "mode", "start",
4348
              "wait_for_sync", "ip_check", "nics",
4349
              "hvparams", "beparams"]
4350
  REQ_BGL = False
4351

    
4352
  def _ExpandNode(self, node):
4353
    """Expands and checks one node name.
4354

4355
    """
4356
    node_full = self.cfg.ExpandNodeName(node)
4357
    if node_full is None:
4358
      raise errors.OpPrereqError("Unknown node %s" % node)
4359
    return node_full
4360

    
4361
  def ExpandNames(self):
4362
    """ExpandNames for CreateInstance.
4363

4364
    Figure out the right locks for instance creation.
4365

4366
    """
4367
    self.needed_locks = {}
4368

    
4369
    # set optional parameters to none if they don't exist
4370
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4371
      if not hasattr(self.op, attr):
4372
        setattr(self.op, attr, None)
4373

    
4374
    # cheap checks, mostly valid constants given
4375

    
4376
    # verify creation mode
4377
    if self.op.mode not in (constants.INSTANCE_CREATE,
4378
                            constants.INSTANCE_IMPORT):
4379
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4380
                                 self.op.mode)
4381

    
4382
    # disk template and mirror node verification
4383
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4384
      raise errors.OpPrereqError("Invalid disk template name")
4385

    
4386
    if self.op.hypervisor is None:
4387
      self.op.hypervisor = self.cfg.GetHypervisorType()
4388

    
4389
    cluster = self.cfg.GetClusterInfo()
4390
    enabled_hvs = cluster.enabled_hypervisors
4391
    if self.op.hypervisor not in enabled_hvs:
4392
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4393
                                 " cluster (%s)" % (self.op.hypervisor,
4394
                                  ",".join(enabled_hvs)))
4395

    
4396
    # check hypervisor parameter syntax (locally)
4397
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4398
    filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4399
                                  self.op.hvparams)
4400
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4401
    hv_type.CheckParameterSyntax(filled_hvp)
4402

    
4403
    # fill and remember the beparams dict
4404
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4405
    self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4406
                                    self.op.beparams)
4407

    
4408
    #### instance parameters check
4409

    
4410
    # instance name verification
4411
    hostname1 = utils.HostInfo(self.op.instance_name)
4412
    self.op.instance_name = instance_name = hostname1.name
4413

    
4414
    # this is just a preventive check, but someone might still add this
4415
    # instance in the meantime, and creation will fail at lock-add time
4416
    if instance_name in self.cfg.GetInstanceList():
4417
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4418
                                 instance_name)
4419

    
4420
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4421

    
4422
    # NIC buildup
4423
    self.nics = []
4424
    for idx, nic in enumerate(self.op.nics):
4425
      nic_mode_req = nic.get("mode", None)
4426
      nic_mode = nic_mode_req
4427
      if nic_mode is None:
4428
        nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4429

    
4430
      # in routed mode, for the first nic, the default ip is 'auto'
4431
      if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4432
        default_ip_mode = constants.VALUE_AUTO
4433
      else:
4434
        default_ip_mode = constants.VALUE_NONE
4435

    
4436
      # ip validity checks
4437
      ip = nic.get("ip", default_ip_mode)
4438
      if ip is None or ip.lower() == constants.VALUE_NONE:
4439
        nic_ip = None
4440
      elif ip.lower() == constants.VALUE_AUTO:
4441
        nic_ip = hostname1.ip
4442
      else:
4443
        if not utils.IsValidIP(ip):
4444
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4445
                                     " like a valid IP" % ip)
4446
        nic_ip = ip
4447

    
4448
      # TODO: check the ip for uniqueness !!
4449
      if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4450
        raise errors.OpPrereqError("Routed nic mode requires an ip address")
4451

    
4452
      # MAC address verification
4453
      mac = nic.get("mac", constants.VALUE_AUTO)
4454
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4455
        if not utils.IsValidMac(mac.lower()):
4456
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4457
                                     mac)
4458
      # bridge verification
4459
      bridge = nic.get("bridge", None)
4460
      link = nic.get("link", None)
4461
      if bridge and link:
4462
        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4463
      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4464
        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4465
      elif bridge:
4466
        link = bridge
4467

    
4468
      nicparams = {}
4469
      if nic_mode_req:
4470
        nicparams[constants.NIC_MODE] = nic_mode_req
4471
      if link:
4472
        nicparams[constants.NIC_LINK] = link
4473

    
4474
      check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4475
                                      nicparams)
4476
      objects.NIC.CheckParameterSyntax(check_params)
4477
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4478

    
4479
    # disk checks/pre-build
4480
    self.disks = []
4481
    for disk in self.op.disks:
4482
      mode = disk.get("mode", constants.DISK_RDWR)
4483
      if mode not in constants.DISK_ACCESS_SET:
4484
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4485
                                   mode)
4486
      size = disk.get("size", None)
4487
      if size is None:
4488
        raise errors.OpPrereqError("Missing disk size")
4489
      try:
4490
        size = int(size)
4491
      except ValueError:
4492
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4493
      self.disks.append({"size": size, "mode": mode})
4494

    
4495
    # used in CheckPrereq for ip ping check
4496
    self.check_ip = hostname1.ip
4497

    
4498
    # file storage checks
4499
    if (self.op.file_driver and
4500
        not self.op.file_driver in constants.FILE_DRIVER):
4501
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4502
                                 self.op.file_driver)
4503

    
4504
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4505
      raise errors.OpPrereqError("File storage directory path not absolute")
4506

    
4507
    ### Node/iallocator related checks
4508
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4509
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4510
                                 " node must be given")
4511

    
4512
    if self.op.iallocator:
4513
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4514
    else:
4515
      self.op.pnode = self._ExpandNode(self.op.pnode)
4516
      nodelist = [self.op.pnode]
4517
      if self.op.snode is not None:
4518
        self.op.snode = self._ExpandNode(self.op.snode)
4519
        nodelist.append(self.op.snode)
4520
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4521

    
4522
    # in case of import lock the source node too
4523
    if self.op.mode == constants.INSTANCE_IMPORT:
4524
      src_node = getattr(self.op, "src_node", None)
4525
      src_path = getattr(self.op, "src_path", None)
4526

    
4527
      if src_path is None:
4528
        self.op.src_path = src_path = self.op.instance_name
4529

    
4530
      if src_node is None:
4531
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4532
        self.op.src_node = None
4533
        if os.path.isabs(src_path):
4534
          raise errors.OpPrereqError("Importing an instance from an absolute"
4535
                                     " path requires a source node option.")
4536
      else:
4537
        self.op.src_node = src_node = self._ExpandNode(src_node)
4538
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4539
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4540
        if not os.path.isabs(src_path):
4541
          self.op.src_path = src_path = \
4542
            os.path.join(constants.EXPORT_DIR, src_path)
4543

    
4544
    else: # INSTANCE_CREATE
4545
      if getattr(self.op, "os_type", None) is None:
4546
        raise errors.OpPrereqError("No guest OS specified")
4547

    
4548
  def _RunAllocator(self):
4549
    """Run the allocator based on input opcode.
4550

4551
    """
4552
    nics = [n.ToDict() for n in self.nics]
4553
    ial = IAllocator(self,
4554
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4555
                     name=self.op.instance_name,
4556
                     disk_template=self.op.disk_template,
4557
                     tags=[],
4558
                     os=self.op.os_type,
4559
                     vcpus=self.be_full[constants.BE_VCPUS],
4560
                     mem_size=self.be_full[constants.BE_MEMORY],
4561
                     disks=self.disks,
4562
                     nics=nics,
4563
                     hypervisor=self.op.hypervisor,
4564
                     )
4565

    
4566
    ial.Run(self.op.iallocator)
4567

    
4568
    if not ial.success:
4569
      raise errors.OpPrereqError("Can't compute nodes using"
4570
                                 " iallocator '%s': %s" % (self.op.iallocator,
4571
                                                           ial.info))
4572
    if len(ial.nodes) != ial.required_nodes:
4573
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4574
                                 " of nodes (%s), required %s" %
4575
                                 (self.op.iallocator, len(ial.nodes),
4576
                                  ial.required_nodes))
4577
    self.op.pnode = ial.nodes[0]
4578
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4579
                 self.op.instance_name, self.op.iallocator,
4580
                 ", ".join(ial.nodes))
4581
    if ial.required_nodes == 2:
4582
      self.op.snode = ial.nodes[1]
4583

    
4584
  def BuildHooksEnv(self):
4585
    """Build hooks env.
4586

4587
    This runs on master, primary and secondary nodes of the instance.
4588

4589
    """
4590
    env = {
4591
      "ADD_MODE": self.op.mode,
4592
      }
4593
    if self.op.mode == constants.INSTANCE_IMPORT:
4594
      env["SRC_NODE"] = self.op.src_node
4595
      env["SRC_PATH"] = self.op.src_path
4596
      env["SRC_IMAGES"] = self.src_images
4597

    
4598
    env.update(_BuildInstanceHookEnv(
4599
      name=self.op.instance_name,
4600
      primary_node=self.op.pnode,
4601
      secondary_nodes=self.secondaries,
4602
      status=self.op.start,
4603
      os_type=self.op.os_type,
4604
      memory=self.be_full[constants.BE_MEMORY],
4605
      vcpus=self.be_full[constants.BE_VCPUS],
4606
      nics=_PreBuildNICHooksList(self, self.nics),
4607
      disk_template=self.op.disk_template,
4608
      disks=[(d["size"], d["mode"]) for d in self.disks],
4609
    ))
4610

    
4611
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4612
          self.secondaries)
4613
    return env, nl, nl
4614

    
4615

    
4616
  def CheckPrereq(self):
4617
    """Check prerequisites.
4618

4619
    """
4620
    if (not self.cfg.GetVGName() and
4621
        self.op.disk_template not in constants.DTS_NOT_LVM):
4622
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4623
                                 " instances")
4624

    
4625
    if self.op.mode == constants.INSTANCE_IMPORT:
4626
      src_node = self.op.src_node
4627
      src_path = self.op.src_path
4628

    
4629
      if src_node is None:
4630
        locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4631
        exp_list = self.rpc.call_export_list(locked_nodes)
4632
        found = False
4633
        for node in exp_list:
4634
          if exp_list[node].fail_msg:
4635
            continue
4636
          if src_path in exp_list[node].payload:
4637
            found = True
4638
            self.op.src_node = src_node = node
4639
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4640
                                                       src_path)
4641
            break
4642
        if not found:
4643
          raise errors.OpPrereqError("No export found for relative path %s" %
4644
                                      src_path)
4645

    
4646
      _CheckNodeOnline(self, src_node)
4647
      result = self.rpc.call_export_info(src_node, src_path)
4648
      result.Raise("No export or invalid export found in dir %s" % src_path)
4649

    
4650
      export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4651
      if not export_info.has_section(constants.INISECT_EXP):
4652
        raise errors.ProgrammerError("Corrupted export config")
4653

    
4654
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4655
      if (int(ei_version) != constants.EXPORT_VERSION):
4656
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4657
                                   (ei_version, constants.EXPORT_VERSION))
4658

    
4659
      # Check that the new instance doesn't have less disks than the export
4660
      instance_disks = len(self.disks)
4661
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4662
      if instance_disks < export_disks:
4663
        raise errors.OpPrereqError("Not enough disks to import."
4664
                                   " (instance: %d, export: %d)" %
4665
                                   (instance_disks, export_disks))
4666

    
4667
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4668
      disk_images = []
4669
      for idx in range(export_disks):
4670
        option = 'disk%d_dump' % idx
4671
        if export_info.has_option(constants.INISECT_INS, option):
4672
          # FIXME: are the old os-es, disk sizes, etc. useful?
4673
          export_name = export_info.get(constants.INISECT_INS, option)
4674
          image = os.path.join(src_path, export_name)
4675
          disk_images.append(image)
4676
        else:
4677
          disk_images.append(False)
4678

    
4679
      self.src_images = disk_images
4680

    
4681
      old_name = export_info.get(constants.INISECT_INS, 'name')
4682
      # FIXME: int() here could throw a ValueError on broken exports
4683
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4684
      if self.op.instance_name == old_name:
4685
        for idx, nic in enumerate(self.nics):
4686
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4687
            nic_mac_ini = 'nic%d_mac' % idx
4688
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4689

    
4690
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4691
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4692
    if self.op.start and not self.op.ip_check:
4693
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4694
                                 " adding an instance in start mode")
4695

    
4696
    if self.op.ip_check:
4697
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4698
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4699
                                   (self.check_ip, self.op.instance_name))
4700

    
4701
    #### mac address generation
4702
    # By generating here the mac address both the allocator and the hooks get
4703
    # the real final mac address rather than the 'auto' or 'generate' value.
4704
    # There is a race condition between the generation and the instance object
4705
    # creation, which means that we know the mac is valid now, but we're not
4706
    # sure it will be when we actually add the instance. If things go bad
4707
    # adding the instance will abort because of a duplicate mac, and the
4708
    # creation job will fail.
4709
    for nic in self.nics:
4710
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4711
        nic.mac = self.cfg.GenerateMAC()
4712

    
4713
    #### allocator run
4714

    
4715
    if self.op.iallocator is not None:
4716
      self._RunAllocator()
4717

    
4718
    #### node related checks
4719

    
4720
    # check primary node
4721
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4722
    assert self.pnode is not None, \
4723
      "Cannot retrieve locked node %s" % self.op.pnode
4724
    if pnode.offline:
4725
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4726
                                 pnode.name)
4727
    if pnode.drained:
4728
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4729
                                 pnode.name)
4730

    
4731
    self.secondaries = []
4732

    
4733
    # mirror node verification
4734
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4735
      if self.op.snode is None:
4736
        raise errors.OpPrereqError("The networked disk templates need"
4737
                                   " a mirror node")
4738
      if self.op.snode == pnode.name:
4739
        raise errors.OpPrereqError("The secondary node cannot be"
4740
                                   " the primary node.")
4741
      _CheckNodeOnline(self, self.op.snode)
4742
      _CheckNodeNotDrained(self, self.op.snode)
4743
      self.secondaries.append(self.op.snode)
4744

    
4745
    nodenames = [pnode.name] + self.secondaries
4746

    
4747
    req_size = _ComputeDiskSize(self.op.disk_template,
4748
                                self.disks)
4749

    
4750
    # Check lv size requirements
4751
    if req_size is not None:
4752
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4753
                                         self.op.hypervisor)
4754
      for node in nodenames:
4755
        info = nodeinfo[node]
4756
        info.Raise("Cannot get current information from node %s" % node)
4757
        info = info.payload
4758
        vg_free = info.get('vg_free', None)
4759
        if not isinstance(vg_free, int):
4760
          raise errors.OpPrereqError("Can't compute free disk space on"
4761
                                     " node %s" % node)
4762
        if req_size > vg_free:
4763
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4764
                                     " %d MB available, %d MB required" %
4765
                                     (node, vg_free, req_size))
4766

    
4767
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4768

    
4769
    # os verification
4770
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4771
    result.Raise("OS '%s' not in supported os list for primary node %s" %
4772
                 (self.op.os_type, pnode.name), prereq=True)
4773

    
4774
    _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4775

    
4776
    # memory check on primary node
4777
    if self.op.start:
4778
      _CheckNodeFreeMemory(self, self.pnode.name,
4779
                           "creating instance %s" % self.op.instance_name,
4780
                           self.be_full[constants.BE_MEMORY],
4781
                           self.op.hypervisor)
4782

    
4783
  def Exec(self, feedback_fn):
4784
    """Create and add the instance to the cluster.
4785

4786
    """
4787
    instance = self.op.instance_name
4788
    pnode_name = self.pnode.name
4789

    
4790
    ht_kind = self.op.hypervisor
4791
    if ht_kind in constants.HTS_REQ_PORT:
4792
      network_port = self.cfg.AllocatePort()
4793
    else:
4794
      network_port = None
4795

    
4796
    ##if self.op.vnc_bind_address is None:
4797
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4798

    
4799
    # this is needed because os.path.join does not accept None arguments
4800
    if self.op.file_storage_dir is None:
4801
      string_file_storage_dir = ""
4802
    else:
4803
      string_file_storage_dir = self.op.file_storage_dir
4804

    
4805
    # build the full file storage dir path
4806
    file_storage_dir = os.path.normpath(os.path.join(
4807
                                        self.cfg.GetFileStorageDir(),
4808
                                        string_file_storage_dir, instance))
4809

    
4810

    
4811
    disks = _GenerateDiskTemplate(self,
4812
                                  self.op.disk_template,
4813
                                  instance, pnode_name,
4814
                                  self.secondaries,
4815
                                  self.disks,
4816
                                  file_storage_dir,
4817
                                  self.op.file_driver,
4818
                                  0)
4819

    
4820
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4821
                            primary_node=pnode_name,
4822
                            nics=self.nics, disks=disks,
4823
                            disk_template=self.op.disk_template,
4824
                            admin_up=False,
4825
                            network_port=network_port,
4826
                            beparams=self.op.beparams,
4827
                            hvparams=self.op.hvparams,
4828
                            hypervisor=self.op.hypervisor,
4829
                            )
4830

    
4831
    feedback_fn("* creating instance disks...")
4832
    try:
4833
      _CreateDisks(self, iobj)
4834
    except errors.OpExecError:
4835
      self.LogWarning("Device creation failed, reverting...")
4836
      try:
4837
        _RemoveDisks(self, iobj)
4838
      finally:
4839
        self.cfg.ReleaseDRBDMinors(instance)
4840
        raise
4841

    
4842
    feedback_fn("adding instance %s to cluster config" % instance)
4843

    
4844
    self.cfg.AddInstance(iobj)
4845
    # Declare that we don't want to remove the instance lock anymore, as we've
4846
    # added the instance to the config
4847
    del self.remove_locks[locking.LEVEL_INSTANCE]
4848
    # Unlock all the nodes
4849
    if self.op.mode == constants.INSTANCE_IMPORT:
4850
      nodes_keep = [self.op.src_node]
4851
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4852
                       if node != self.op.src_node]
4853
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4854
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4855
    else:
4856
      self.context.glm.release(locking.LEVEL_NODE)
4857
      del self.acquired_locks[locking.LEVEL_NODE]
4858

    
4859
    if self.op.wait_for_sync:
4860
      disk_abort = not _WaitForSync(self, iobj)
4861
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4862
      # make sure the disks are not degraded (still sync-ing is ok)
4863
      time.sleep(15)
4864
      feedback_fn("* checking mirrors status")
4865
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4866
    else:
4867
      disk_abort = False
4868

    
4869
    if disk_abort:
4870
      _RemoveDisks(self, iobj)
4871
      self.cfg.RemoveInstance(iobj.name)
4872
      # Make sure the instance lock gets removed
4873
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4874
      raise errors.OpExecError("There are some degraded disks for"
4875
                               " this instance")
4876

    
4877
    feedback_fn("creating os for instance %s on node %s" %
4878
                (instance, pnode_name))
4879

    
4880
    if iobj.disk_template != constants.DT_DISKLESS:
4881
      if self.op.mode == constants.INSTANCE_CREATE:
4882
        feedback_fn("* running the instance OS create scripts...")
4883
        result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4884
        result.Raise("Could not add os for instance %s"
4885
                     " on node %s" % (instance, pnode_name))
4886

    
4887
      elif self.op.mode == constants.INSTANCE_IMPORT:
4888
        feedback_fn("* running the instance OS import scripts...")
4889
        src_node = self.op.src_node
4890
        src_images = self.src_images
4891
        cluster_name = self.cfg.GetClusterName()
4892
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4893
                                                         src_node, src_images,
4894
                                                         cluster_name)
4895
        msg = import_result.fail_msg
4896
        if msg:
4897
          self.LogWarning("Error while importing the disk images for instance"
4898
                          " %s on node %s: %s" % (instance, pnode_name, msg))
4899
      else:
4900
        # also checked in the prereq part
4901
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4902
                                     % self.op.mode)
4903

    
4904
    if self.op.start:
4905
      iobj.admin_up = True
4906
      self.cfg.Update(iobj)
4907
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4908
      feedback_fn("* starting instance...")
4909
      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4910
      result.Raise("Could not start instance")
4911

    
4912

    
4913
class LUConnectConsole(NoHooksLU):
4914
  """Connect to an instance's console.
4915

4916
  This is somewhat special in that it returns the command line that
4917
  you need to run on the master node in order to connect to the
4918
  console.
4919

4920
  """
4921
  _OP_REQP = ["instance_name"]
4922
  REQ_BGL = False
4923

    
4924
  def ExpandNames(self):
4925
    self._ExpandAndLockInstance()
4926

    
4927
  def CheckPrereq(self):
4928
    """Check prerequisites.
4929

4930
    This checks that the instance is in the cluster.
4931

4932
    """
4933
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4934
    assert self.instance is not None, \
4935
      "Cannot retrieve locked instance %s" % self.op.instance_name
4936
    _CheckNodeOnline(self, self.instance.primary_node)
4937

    
4938
  def Exec(self, feedback_fn):
4939
    """Connect to the console of an instance
4940

4941
    """
4942
    instance = self.instance
4943
    node = instance.primary_node
4944

    
4945
    node_insts = self.rpc.call_instance_list([node],
4946
                                             [instance.hypervisor])[node]
4947
    node_insts.Raise("Can't get node information from %s" % node)
4948

    
4949
    if instance.name not in node_insts.payload:
4950
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4951

    
4952
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4953

    
4954
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4955
    cluster = self.cfg.GetClusterInfo()
4956
    # beparams and hvparams are passed separately, to avoid editing the
4957
    # instance and then saving the defaults in the instance itself.
4958
    hvparams = cluster.FillHV(instance)
4959
    beparams = cluster.FillBE(instance)
4960
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4961

    
4962
    # build ssh cmdline
4963
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4964

    
4965

    
4966
class LUReplaceDisks(LogicalUnit):
4967
  """Replace the disks of an instance.
4968

4969
  """
4970
  HPATH = "mirrors-replace"
4971
  HTYPE = constants.HTYPE_INSTANCE
4972
  _OP_REQP = ["instance_name", "mode", "disks"]
4973
  REQ_BGL = False
4974

    
4975
  def CheckArguments(self):
4976
    if not hasattr(self.op, "remote_node"):
4977
      self.op.remote_node = None
4978
    if not hasattr(self.op, "iallocator"):
4979
      self.op.iallocator = None
4980

    
4981
    # check for valid parameter combination
4982
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
4983
    if self.op.mode == constants.REPLACE_DISK_CHG:
4984
      if cnt == 2:
4985
        raise errors.OpPrereqError("When changing the secondary either an"
4986
                                   " iallocator script must be used or the"
4987
                                   " new node given")
4988
      elif cnt == 0:
4989
        raise errors.OpPrereqError("Give either the iallocator or the new"
4990
                                   " secondary, not both")
4991
    else: # not replacing the secondary
4992
      if cnt != 2:
4993
        raise errors.OpPrereqError("The iallocator and new node options can"
4994
                                   " be used only when changing the"
4995
                                   " secondary node")
4996

    
4997
  def ExpandNames(self):
4998
    self._ExpandAndLockInstance()
4999

    
5000
    if self.op.iallocator is not None:
5001
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5002
    elif self.op.remote_node is not None:
5003
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5004
      if remote_node is None:
5005
        raise errors.OpPrereqError("Node '%s' not known" %
5006
                                   self.op.remote_node)
5007
      self.op.remote_node = remote_node
5008
      # Warning: do not remove the locking of the new secondary here
5009
      # unless DRBD8.AddChildren is changed to work in parallel;
5010
      # currently it doesn't since parallel invocations of
5011
      # FindUnusedMinor will conflict
5012
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5013
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5014
    else:
5015
      self.needed_locks[locking.LEVEL_NODE] = []
5016
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5017

    
5018
  def DeclareLocks(self, level):
5019
    # If we're not already locking all nodes in the set we have to declare the
5020
    # instance's primary/secondary nodes.
5021
    if (level == locking.LEVEL_NODE and
5022
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5023
      self._LockInstancesNodes()
5024

    
5025
  def _RunAllocator(self):
5026
    """Compute a new secondary node using an IAllocator.
5027

5028
    """
5029
    ial = IAllocator(self,
5030
                     mode=constants.IALLOCATOR_MODE_RELOC,
5031
                     name=self.op.instance_name,
5032
                     relocate_from=[self.sec_node])
5033

    
5034
    ial.Run(self.op.iallocator)
5035

    
5036
    if not ial.success:
5037
      raise errors.OpPrereqError("Can't compute nodes using"
5038
                                 " iallocator '%s': %s" % (self.op.iallocator,
5039
                                                           ial.info))
5040
    if len(ial.nodes) != ial.required_nodes:
5041
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5042
                                 " of nodes (%s), required %s" %
5043
                                 (len(ial.nodes), ial.required_nodes))
5044
    self.op.remote_node = ial.nodes[0]
5045
    self.LogInfo("Selected new secondary for the instance: %s",
5046
                 self.op.remote_node)
5047

    
5048
  def BuildHooksEnv(self):
5049
    """Build hooks env.
5050

5051
    This runs on the master, the primary and all the secondaries.
5052

5053
    """
5054
    env = {
5055
      "MODE": self.op.mode,
5056
      "NEW_SECONDARY": self.op.remote_node,
5057
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
5058
      }
5059
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5060
    nl = [
5061
      self.cfg.GetMasterNode(),
5062
      self.instance.primary_node,
5063
      ]
5064
    if self.op.remote_node is not None:
5065
      nl.append(self.op.remote_node)
5066
    return env, nl, nl
5067

    
5068
  def CheckPrereq(self):
5069
    """Check prerequisites.
5070

5071
    This checks that the instance is in the cluster.
5072

5073
    """
5074
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5075
    assert instance is not None, \
5076
      "Cannot retrieve locked instance %s" % self.op.instance_name
5077
    self.instance = instance
5078

    
5079
    if instance.disk_template != constants.DT_DRBD8:
5080
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5081
                                 " instances")
5082

    
5083
    if len(instance.secondary_nodes) != 1:
5084
      raise errors.OpPrereqError("The instance has a strange layout,"
5085
                                 " expected one secondary but found %d" %
5086
                                 len(instance.secondary_nodes))
5087

    
5088
    self.sec_node = instance.secondary_nodes[0]
5089

    
5090
    if self.op.iallocator is not None:
5091
      self._RunAllocator()
5092

    
5093
    remote_node = self.op.remote_node
5094
    if remote_node is not None:
5095
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5096
      assert self.remote_node_info is not None, \
5097
        "Cannot retrieve locked node %s" % remote_node
5098
    else:
5099
      self.remote_node_info = None
5100
    if remote_node == instance.primary_node:
5101
      raise errors.OpPrereqError("The specified node is the primary node of"
5102
                                 " the instance.")
5103
    elif remote_node == self.sec_node:
5104
      raise errors.OpPrereqError("The specified node is already the"
5105
                                 " secondary node of the instance.")
5106

    
5107
    if self.op.mode == constants.REPLACE_DISK_PRI:
5108
      n1 = self.tgt_node = instance.primary_node
5109
      n2 = self.oth_node = self.sec_node
5110
    elif self.op.mode == constants.REPLACE_DISK_SEC:
5111
      n1 = self.tgt_node = self.sec_node
5112
      n2 = self.oth_node = instance.primary_node
5113
    elif self.op.mode == constants.REPLACE_DISK_CHG:
5114
      n1 = self.new_node = remote_node
5115
      n2 = self.oth_node = instance.primary_node
5116
      self.tgt_node = self.sec_node
5117
      _CheckNodeNotDrained(self, remote_node)
5118
    else:
5119
      raise errors.ProgrammerError("Unhandled disk replace mode")
5120

    
5121
    _CheckNodeOnline(self, n1)
5122
    _CheckNodeOnline(self, n2)
5123

    
5124
    if not self.op.disks:
5125
      self.op.disks = range(len(instance.disks))
5126

    
5127
    for disk_idx in self.op.disks:
5128
      instance.FindDisk(disk_idx)
5129

    
5130
  def _ExecD8DiskOnly(self, feedback_fn):
5131
    """Replace a disk on the primary or secondary for dbrd8.
5132

5133
    The algorithm for replace is quite complicated:
5134

5135
      1. for each disk to be replaced:
5136

5137
        1. create new LVs on the target node with unique names
5138
        1. detach old LVs from the drbd device
5139
        1. rename old LVs to name_replaced.<time_t>
5140
        1. rename new LVs to old LVs
5141
        1. attach the new LVs (with the old names now) to the drbd device
5142

5143
      1. wait for sync across all devices
5144

5145
      1. for each modified disk:
5146

5147
        1. remove old LVs (which have the name name_replaces.<time_t>)
5148

5149
    Failures are not very well handled.
5150

5151
    """
5152
    steps_total = 6
5153
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5154
    instance = self.instance
5155
    iv_names = {}
5156
    vgname = self.cfg.GetVGName()
5157
    # start of work
5158
    cfg = self.cfg
5159
    tgt_node = self.tgt_node
5160
    oth_node = self.oth_node
5161

    
5162
    # Step: check device activation
5163
    self.proc.LogStep(1, steps_total, "check device existence")
5164
    info("checking volume groups")
5165
    my_vg = cfg.GetVGName()
5166
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5167
    if not results:
5168
      raise errors.OpExecError("Can't list volume groups on the nodes")
5169
    for node in oth_node, tgt_node:
5170
      res = results[node]
5171
      res.Raise("Error checking node %s" % node)
5172
      if my_vg not in res.payload:
5173
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5174
                                 (my_vg, node))
5175
    for idx, dev in enumerate(instance.disks):
5176
      if idx not in self.op.disks:
5177
        continue
5178
      for node in tgt_node, oth_node:
5179
        info("checking disk/%d on %s" % (idx, node))
5180
        cfg.SetDiskID(dev, node)
5181
        result = self.rpc.call_blockdev_find(node, dev)
5182
        msg = result.fail_msg
5183
        if not msg and not result.payload:
5184
          msg = "disk not found"
5185
        if msg:
5186
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5187
                                   (idx, node, msg))
5188

    
5189
    # Step: check other node consistency
5190
    self.proc.LogStep(2, steps_total, "check peer consistency")
5191
    for idx, dev in enumerate(instance.disks):
5192
      if idx not in self.op.disks:
5193
        continue
5194
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5195
      if not _CheckDiskConsistency(self, dev, oth_node,
5196
                                   oth_node==instance.primary_node):
5197
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5198
                                 " to replace disks on this node (%s)" %
5199
                                 (oth_node, tgt_node))
5200

    
5201
    # Step: create new storage
5202
    self.proc.LogStep(3, steps_total, "allocate new storage")
5203
    for idx, dev in enumerate(instance.disks):
5204
      if idx not in self.op.disks:
5205
        continue
5206
      size = dev.size
5207
      cfg.SetDiskID(dev, tgt_node)
5208
      lv_names = [".disk%d_%s" % (idx, suf)
5209
                  for suf in ["data", "meta"]]
5210
      names = _GenerateUniqueNames(self, lv_names)
5211
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5212
                             logical_id=(vgname, names[0]))
5213
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5214
                             logical_id=(vgname, names[1]))
5215
      new_lvs = [lv_data, lv_meta]
5216
      old_lvs = dev.children
5217
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5218
      info("creating new local storage on %s for %s" %
5219
           (tgt_node, dev.iv_name))
5220
      # we pass force_create=True to force the LVM creation
5221
      for new_lv in new_lvs:
5222
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5223
                        _GetInstanceInfoText(instance), False)
5224

    
5225
    # Step: for each lv, detach+rename*2+attach
5226
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5227
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5228
      info("detaching %s drbd from local storage" % dev.iv_name)
5229
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5230
      result.Raise("Can't detach drbd from local storage on node"
5231
                   " %s for device %s" % (tgt_node, dev.iv_name))
5232
      #dev.children = []
5233
      #cfg.Update(instance)
5234

    
5235
      # ok, we created the new LVs, so now we know we have the needed
5236
      # storage; as such, we proceed on the target node to rename
5237
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5238
      # using the assumption that logical_id == physical_id (which in
5239
      # turn is the unique_id on that node)
5240

    
5241
      # FIXME(iustin): use a better name for the replaced LVs
5242
      temp_suffix = int(time.time())
5243
      ren_fn = lambda d, suff: (d.physical_id[0],
5244
                                d.physical_id[1] + "_replaced-%s" % suff)
5245
      # build the rename list based on what LVs exist on the node
5246
      rlist = []
5247
      for to_ren in old_lvs:
5248
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5249
        if not result.fail_msg and result.payload:
5250
          # device exists
5251
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5252

    
5253
      info("renaming the old LVs on the target node")
5254
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5255
      result.Raise("Can't rename old LVs on node %s" % tgt_node)
5256
      # now we rename the new LVs to the old LVs
5257
      info("renaming the new LVs on the target node")
5258
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5259
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5260
      result.Raise("Can't rename new LVs on node %s" % tgt_node)
5261

    
5262
      for old, new in zip(old_lvs, new_lvs):
5263
        new.logical_id = old.logical_id
5264
        cfg.SetDiskID(new, tgt_node)
5265

    
5266
      for disk in old_lvs:
5267
        disk.logical_id = ren_fn(disk, temp_suffix)
5268
        cfg.SetDiskID(disk, tgt_node)
5269

    
5270
      # now that the new lvs have the old name, we can add them to the device
5271
      info("adding new mirror component on %s" % tgt_node)
5272
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5273
      msg = result.fail_msg
5274
      if msg:
5275
        for new_lv in new_lvs:
5276
          msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5277
          if msg2:
5278
            warning("Can't rollback device %s: %s", dev, msg2,
5279
                    hint="cleanup manually the unused logical volumes")
5280
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5281

    
5282
      dev.children = new_lvs
5283
      cfg.Update(instance)
5284

    
5285
    # Step: wait for sync
5286

    
5287
    # this can fail as the old devices are degraded and _WaitForSync
5288
    # does a combined result over all disks, so we don't check its
5289
    # return value
5290
    self.proc.LogStep(5, steps_total, "sync devices")
5291
    _WaitForSync(self, instance, unlock=True)
5292

    
5293
    # so check manually all the devices
5294
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5295
      cfg.SetDiskID(dev, instance.primary_node)
5296
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5297
      msg = result.fail_msg
5298
      if not msg and not result.payload:
5299
        msg = "disk not found"
5300
      if msg:
5301
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5302
                                 (name, msg))
5303
      if result.payload[5]:
5304
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5305

    
5306
    # Step: remove old storage
5307
    self.proc.LogStep(6, steps_total, "removing old storage")
5308
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5309
      info("remove logical volumes for %s" % name)
5310
      for lv in old_lvs:
5311
        cfg.SetDiskID(lv, tgt_node)
5312
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5313
        if msg:
5314
          warning("Can't remove old LV: %s" % msg,
5315
                  hint="manually remove unused LVs")
5316
          continue
5317

    
5318
  def _ExecD8Secondary(self, feedback_fn):
5319
    """Replace the secondary node for drbd8.
5320

5321
    The algorithm for replace is quite complicated:
5322
      - for all disks of the instance:
5323
        - create new LVs on the new node with same names
5324
        - shutdown the drbd device on the old secondary
5325
        - disconnect the drbd network on the primary
5326
        - create the drbd device on the new secondary
5327
        - network attach the drbd on the primary, using an artifice:
5328
          the drbd code for Attach() will connect to the network if it
5329
          finds a device which is connected to the good local disks but
5330
          not network enabled
5331
      - wait for sync across all devices
5332
      - remove all disks from the old secondary
5333

5334
    Failures are not very well handled.
5335

5336
    """
5337
    steps_total = 6
5338
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5339
    instance = self.instance
5340
    iv_names = {}
5341
    # start of work
5342
    cfg = self.cfg
5343
    old_node = self.tgt_node
5344
    new_node = self.new_node
5345
    pri_node = instance.primary_node
5346
    nodes_ip = {
5347
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5348
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5349
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5350
      }
5351

    
5352
    # Step: check device activation
5353
    self.proc.LogStep(1, steps_total, "check device existence")
5354
    info("checking volume groups")
5355
    my_vg = cfg.GetVGName()
5356
    results = self.rpc.call_vg_list([pri_node, new_node])
5357
    for node in pri_node, new_node:
5358
      res = results[node]
5359
      res.Raise("Error checking node %s" % node)
5360
      if my_vg not in res.payload:
5361
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5362
                                 (my_vg, node))
5363
    for idx, dev in enumerate(instance.disks):
5364
      if idx not in self.op.disks:
5365
        continue
5366
      info("checking disk/%d on %s" % (idx, pri_node))
5367
      cfg.SetDiskID(dev, pri_node)
5368
      result = self.rpc.call_blockdev_find(pri_node, dev)
5369
      msg = result.fail_msg
5370
      if not msg and not result.payload:
5371
        msg = "disk not found"
5372
      if msg:
5373
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5374
                                 (idx, pri_node, msg))
5375

    
5376
    # Step: check other node consistency
5377
    self.proc.LogStep(2, steps_total, "check peer consistency")
5378
    for idx, dev in enumerate(instance.disks):
5379
      if idx not in self.op.disks:
5380
        continue
5381
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5382
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5383
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5384
                                 " unsafe to replace the secondary" %
5385
                                 pri_node)
5386

    
5387
    # Step: create new storage
5388
    self.proc.LogStep(3, steps_total, "allocate new storage")
5389
    for idx, dev in enumerate(instance.disks):
5390
      info("adding new local storage on %s for disk/%d" %
5391
           (new_node, idx))
5392
      # we pass force_create=True to force LVM creation
5393
      for new_lv in dev.children:
5394
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5395
                        _GetInstanceInfoText(instance), False)
5396

    
5397
    # Step 4: dbrd minors and drbd setups changes
5398
    # after this, we must manually remove the drbd minors on both the
5399
    # error and the success paths
5400
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5401
                                   instance.name)
5402
    logging.debug("Allocated minors %s" % (minors,))
5403
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5404
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5405
      size = dev.size
5406
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5407
      # create new devices on new_node; note that we create two IDs:
5408
      # one without port, so the drbd will be activated without
5409
      # networking information on the new node at this stage, and one
5410
      # with network, for the latter activation in step 4
5411
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5412
      if pri_node == o_node1:
5413
        p_minor = o_minor1
5414
      else:
5415
        p_minor = o_minor2
5416

    
5417
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5418
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5419

    
5420
      iv_names[idx] = (dev, dev.children, new_net_id)
5421
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5422
                    new_net_id)
5423
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5424
                              logical_id=new_alone_id,
5425
                              children=dev.children)
5426
      try:
5427
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5428
                              _GetInstanceInfoText(instance), False)
5429
      except errors.GenericError:
5430
        self.cfg.ReleaseDRBDMinors(instance.name)
5431
        raise
5432

    
5433
    for idx, dev in enumerate(instance.disks):
5434
      # we have new devices, shutdown the drbd on the old secondary
5435
      info("shutting down drbd for disk/%d on old node" % idx)
5436
      cfg.SetDiskID(dev, old_node)
5437
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5438
      if msg:
5439
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5440
                (idx, msg),
5441
                hint="Please cleanup this device manually as soon as possible")
5442

    
5443
    info("detaching primary drbds from the network (=> standalone)")
5444
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5445
                                               instance.disks)[pri_node]
5446

    
5447
    msg = result.fail_msg
5448
    if msg:
5449
      # detaches didn't succeed (unlikely)
5450
      self.cfg.ReleaseDRBDMinors(instance.name)
5451
      raise errors.OpExecError("Can't detach the disks from the network on"
5452
                               " old node: %s" % (msg,))
5453

    
5454
    # if we managed to detach at least one, we update all the disks of
5455
    # the instance to point to the new secondary
5456
    info("updating instance configuration")
5457
    for dev, _, new_logical_id in iv_names.itervalues():
5458
      dev.logical_id = new_logical_id
5459
      cfg.SetDiskID(dev, pri_node)
5460
    cfg.Update(instance)
5461

    
5462
    # and now perform the drbd attach
5463
    info("attaching primary drbds to new secondary (standalone => connected)")
5464
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5465
                                           instance.disks, instance.name,
5466
                                           False)
5467
    for to_node, to_result in result.items():
5468
      msg = to_result.fail_msg
5469
      if msg:
5470
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5471
                hint="please do a gnt-instance info to see the"
5472
                " status of disks")
5473

    
5474
    # this can fail as the old devices are degraded and _WaitForSync
5475
    # does a combined result over all disks, so we don't check its
5476
    # return value
5477
    self.proc.LogStep(5, steps_total, "sync devices")
5478
    _WaitForSync(self, instance, unlock=True)
5479

    
5480
    # so check manually all the devices
5481
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5482
      cfg.SetDiskID(dev, pri_node)
5483
      result = self.rpc.call_blockdev_find(pri_node, dev)
5484
      msg = result.fail_msg
5485
      if not msg and not result.payload:
5486
        msg = "disk not found"
5487
      if msg:
5488
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5489
                                 (idx, msg))
5490
      if result.payload[5]:
5491
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5492

    
5493
    self.proc.LogStep(6, steps_total, "removing old storage")
5494
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5495
      info("remove logical volumes for disk/%d" % idx)
5496
      for lv in old_lvs:
5497
        cfg.SetDiskID(lv, old_node)
5498
        msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5499
        if msg:
5500
          warning("Can't remove LV on old secondary: %s", msg,
5501
                  hint="Cleanup stale volumes by hand")
5502

    
5503
  def Exec(self, feedback_fn):
5504
    """Execute disk replacement.
5505

5506
    This dispatches the disk replacement to the appropriate handler.
5507

5508
    """
5509
    instance = self.instance
5510

    
5511
    # Activate the instance disks if we're replacing them on a down instance
5512
    if not instance.admin_up:
5513
      _StartInstanceDisks(self, instance, True)
5514

    
5515
    if self.op.mode == constants.REPLACE_DISK_CHG:
5516
      fn = self._ExecD8Secondary
5517
    else:
5518
      fn = self._ExecD8DiskOnly
5519

    
5520
    ret = fn(feedback_fn)
5521

    
5522
    # Deactivate the instance disks if we're replacing them on a down instance
5523
    if not instance.admin_up:
5524
      _SafeShutdownInstanceDisks(self, instance)
5525

    
5526
    return ret
5527

    
5528

    
5529
class LUGrowDisk(LogicalUnit):
5530
  """Grow a disk of an instance.
5531

5532
  """
5533
  HPATH = "disk-grow"
5534
  HTYPE = constants.HTYPE_INSTANCE
5535
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5536
  REQ_BGL = False
5537

    
5538
  def ExpandNames(self):
5539
    self._ExpandAndLockInstance()
5540
    self.needed_locks[locking.LEVEL_NODE] = []
5541
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5542

    
5543
  def DeclareLocks(self, level):
5544
    if level == locking.LEVEL_NODE:
5545
      self._LockInstancesNodes()
5546

    
5547
  def BuildHooksEnv(self):
5548
    """Build hooks env.
5549

5550
    This runs on the master, the primary and all the secondaries.
5551

5552
    """
5553
    env = {
5554
      "DISK": self.op.disk,
5555
      "AMOUNT": self.op.amount,
5556
      }
5557
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5558
    nl = [
5559
      self.cfg.GetMasterNode(),
5560
      self.instance.primary_node,
5561
      ]
5562
    return env, nl, nl
5563

    
5564
  def CheckPrereq(self):
5565
    """Check prerequisites.
5566

5567
    This checks that the instance is in the cluster.
5568

5569
    """
5570
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5571
    assert instance is not None, \
5572
      "Cannot retrieve locked instance %s" % self.op.instance_name
5573
    nodenames = list(instance.all_nodes)
5574
    for node in nodenames:
5575
      _CheckNodeOnline(self, node)
5576

    
5577

    
5578
    self.instance = instance
5579

    
5580
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5581
      raise errors.OpPrereqError("Instance's disk layout does not support"
5582
                                 " growing.")
5583

    
5584
    self.disk = instance.FindDisk(self.op.disk)
5585

    
5586
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5587
                                       instance.hypervisor)
5588
    for node in nodenames:
5589
      info = nodeinfo[node]
5590
      info.Raise("Cannot get current information from node %s" % node)
5591
      vg_free = info.payload.get('vg_free', None)
5592
      if not isinstance(vg_free, int):
5593
        raise errors.OpPrereqError("Can't compute free disk space on"
5594
                                   " node %s" % node)
5595
      if self.op.amount > vg_free:
5596
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5597
                                   " %d MiB available, %d MiB required" %
5598
                                   (node, vg_free, self.op.amount))
5599

    
5600
  def Exec(self, feedback_fn):
5601
    """Execute disk grow.
5602

5603
    """
5604
    instance = self.instance
5605
    disk = self.disk
5606
    for node in instance.all_nodes:
5607
      self.cfg.SetDiskID(disk, node)
5608
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5609
      result.Raise("Grow request failed to node %s" % node)
5610
    disk.RecordGrow(self.op.amount)
5611
    self.cfg.Update(instance)
5612
    if self.op.wait_for_sync:
5613
      disk_abort = not _WaitForSync(self, instance)
5614
      if disk_abort:
5615
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5616
                             " status.\nPlease check the instance.")
5617

    
5618

    
5619
class LUQueryInstanceData(NoHooksLU):
5620
  """Query runtime instance data.
5621

5622
  """
5623
  _OP_REQP = ["instances", "static"]
5624
  REQ_BGL = False
5625

    
5626
  def ExpandNames(self):
5627
    self.needed_locks = {}
5628
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5629

    
5630
    if not isinstance(self.op.instances, list):
5631
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5632

    
5633
    if self.op.instances:
5634
      self.wanted_names = []
5635
      for name in self.op.instances:
5636
        full_name = self.cfg.ExpandInstanceName(name)
5637
        if full_name is None:
5638
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5639
        self.wanted_names.append(full_name)
5640
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5641
    else:
5642
      self.wanted_names = None
5643
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5644

    
5645
    self.needed_locks[locking.LEVEL_NODE] = []
5646
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5647

    
5648
  def DeclareLocks(self, level):
5649
    if level == locking.LEVEL_NODE:
5650
      self._LockInstancesNodes()
5651

    
5652
  def CheckPrereq(self):
5653
    """Check prerequisites.
5654

5655
    This only checks the optional instance list against the existing names.
5656

5657
    """
5658
    if self.wanted_names is None:
5659
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5660

    
5661
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5662
                             in self.wanted_names]
5663
    return
5664

    
5665
  def _ComputeDiskStatus(self, instance, snode, dev):
5666
    """Compute block device status.
5667

5668
    """
5669
    static = self.op.static
5670
    if not static:
5671
      self.cfg.SetDiskID(dev, instance.primary_node)
5672
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5673
      if dev_pstatus.offline:
5674
        dev_pstatus = None
5675
      else:
5676
        dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5677
        dev_pstatus = dev_pstatus.payload
5678
    else:
5679
      dev_pstatus = None
5680

    
5681
    if dev.dev_type in constants.LDS_DRBD:
5682
      # we change the snode then (otherwise we use the one passed in)
5683
      if dev.logical_id[0] == instance.primary_node:
5684
        snode = dev.logical_id[1]
5685
      else:
5686
        snode = dev.logical_id[0]
5687

    
5688
    if snode and not static:
5689
      self.cfg.SetDiskID(dev, snode)
5690
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5691
      if dev_sstatus.offline:
5692
        dev_sstatus = None
5693
      else:
5694
        dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5695
        dev_sstatus = dev_sstatus.payload
5696
    else:
5697
      dev_sstatus = None
5698

    
5699
    if dev.children:
5700
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5701
                      for child in dev.children]
5702
    else:
5703
      dev_children = []
5704

    
5705
    data = {
5706
      "iv_name": dev.iv_name,
5707
      "dev_type": dev.dev_type,
5708
      "logical_id": dev.logical_id,
5709
      "physical_id": dev.physical_id,
5710
      "pstatus": dev_pstatus,
5711
      "sstatus": dev_sstatus,
5712
      "children": dev_children,
5713
      "mode": dev.mode,
5714
      }
5715

    
5716
    return data
5717

    
5718
  def Exec(self, feedback_fn):
5719
    """Gather and return data"""
5720
    result = {}
5721

    
5722
    cluster = self.cfg.GetClusterInfo()
5723

    
5724
    for instance in self.wanted_instances:
5725
      if not self.op.static:
5726
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5727
                                                  instance.name,
5728
                                                  instance.hypervisor)
5729
        remote_info.Raise("Error checking node %s" % instance.primary_node)
5730
        remote_info = remote_info.payload
5731
        if remote_info and "state" in remote_info:
5732
          remote_state = "up"
5733
        else:
5734
          remote_state = "down"
5735
      else:
5736
        remote_state = None
5737
      if instance.admin_up:
5738
        config_state = "up"
5739
      else:
5740
        config_state = "down"
5741

    
5742
      disks = [self._ComputeDiskStatus(instance, None, device)
5743
               for device in instance.disks]
5744

    
5745
      idict = {
5746
        "name": instance.name,
5747
        "config_state": config_state,
5748
        "run_state": remote_state,
5749
        "pnode": instance.primary_node,
5750
        "snodes": instance.secondary_nodes,
5751
        "os": instance.os,
5752
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5753
        "disks": disks,
5754
        "hypervisor": instance.hypervisor,
5755
        "network_port": instance.network_port,
5756
        "hv_instance": instance.hvparams,
5757
        "hv_actual": cluster.FillHV(instance),
5758
        "be_instance": instance.beparams,
5759
        "be_actual": cluster.FillBE(instance),
5760
        }
5761

    
5762
      result[instance.name] = idict
5763

    
5764
    return result
5765

    
5766

    
5767
class LUSetInstanceParams(LogicalUnit):
5768
  """Modifies an instances's parameters.
5769

5770
  """
5771
  HPATH = "instance-modify"
5772
  HTYPE = constants.HTYPE_INSTANCE
5773
  _OP_REQP = ["instance_name"]
5774
  REQ_BGL = False
5775

    
5776
  def CheckArguments(self):
5777
    if not hasattr(self.op, 'nics'):
5778
      self.op.nics = []
5779
    if not hasattr(self.op, 'disks'):
5780
      self.op.disks = []
5781
    if not hasattr(self.op, 'beparams'):
5782
      self.op.beparams = {}
5783
    if not hasattr(self.op, 'hvparams'):
5784
      self.op.hvparams = {}
5785
    self.op.force = getattr(self.op, "force", False)
5786
    if not (self.op.nics or self.op.disks or
5787
            self.op.hvparams or self.op.beparams):
5788
      raise errors.OpPrereqError("No changes submitted")
5789

    
5790
    # Disk validation
5791
    disk_addremove = 0
5792
    for disk_op, disk_dict in self.op.disks:
5793
      if disk_op == constants.DDM_REMOVE:
5794
        disk_addremove += 1
5795
        continue
5796
      elif disk_op == constants.DDM_ADD:
5797
        disk_addremove += 1
5798
      else:
5799
        if not isinstance(disk_op, int):
5800
          raise errors.OpPrereqError("Invalid disk index")
5801
      if disk_op == constants.DDM_ADD:
5802
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5803
        if mode not in constants.DISK_ACCESS_SET:
5804
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5805
        size = disk_dict.get('size', None)
5806
        if size is None:
5807
          raise errors.OpPrereqError("Required disk parameter size missing")
5808
        try:
5809
          size = int(size)
5810
        except ValueError, err:
5811
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5812
                                     str(err))
5813
        disk_dict['size'] = size
5814
      else:
5815
        # modification of disk
5816
        if 'size' in disk_dict:
5817
          raise errors.OpPrereqError("Disk size change not possible, use"
5818
                                     " grow-disk")
5819

    
5820
    if disk_addremove > 1:
5821
      raise errors.OpPrereqError("Only one disk add or remove operation"
5822
                                 " supported at a time")
5823

    
5824
    # NIC validation
5825
    nic_addremove = 0
5826
    for nic_op, nic_dict in self.op.nics:
5827
      if nic_op == constants.DDM_REMOVE:
5828
        nic_addremove += 1
5829
        continue
5830
      elif nic_op == constants.DDM_ADD:
5831
        nic_addremove += 1
5832
      else:
5833
        if not isinstance(nic_op, int):
5834
          raise errors.OpPrereqError("Invalid nic index")
5835

    
5836
      # nic_dict should be a dict
5837
      nic_ip = nic_dict.get('ip', None)
5838
      if nic_ip is not None:
5839
        if nic_ip.lower() == constants.VALUE_NONE:
5840
          nic_dict['ip'] = None
5841
        else:
5842
          if not utils.IsValidIP(nic_ip):
5843
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5844

    
5845
      nic_bridge = nic_dict.get('bridge', None)
5846
      nic_link = nic_dict.get('link', None)
5847
      if nic_bridge and nic_link:
5848
        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5849
      elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5850
        nic_dict['bridge'] = None
5851
      elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5852
        nic_dict['link'] = None
5853

    
5854
      if nic_op == constants.DDM_ADD:
5855
        nic_mac = nic_dict.get('mac', None)
5856
        if nic_mac is None:
5857
          nic_dict['mac'] = constants.VALUE_AUTO
5858

    
5859
      if 'mac' in nic_dict:
5860
        nic_mac = nic_dict['mac']
5861
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5862
          if not utils.IsValidMac(nic_mac):
5863
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5864
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5865
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5866
                                     " modifying an existing nic")
5867

    
5868
    if nic_addremove > 1:
5869
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5870
                                 " supported at a time")
5871

    
5872
  def ExpandNames(self):
5873
    self._ExpandAndLockInstance()
5874
    self.needed_locks[locking.LEVEL_NODE] = []
5875
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5876

    
5877
  def DeclareLocks(self, level):
5878
    if level == locking.LEVEL_NODE:
5879
      self._LockInstancesNodes()
5880

    
5881
  def BuildHooksEnv(self):
5882
    """Build hooks env.
5883

5884
    This runs on the master, primary and secondaries.
5885

5886
    """
5887
    args = dict()
5888
    if constants.BE_MEMORY in self.be_new:
5889
      args['memory'] = self.be_new[constants.BE_MEMORY]
5890
    if constants.BE_VCPUS in self.be_new:
5891
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5892
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5893
    # information at all.
5894
    if self.op.nics:
5895
      args['nics'] = []
5896
      nic_override = dict(self.op.nics)
5897
      c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5898
      for idx, nic in enumerate(self.instance.nics):
5899
        if idx in nic_override:
5900
          this_nic_override = nic_override[idx]
5901
        else:
5902
          this_nic_override = {}
5903
        if 'ip' in this_nic_override:
5904
          ip = this_nic_override['ip']
5905
        else:
5906
          ip = nic.ip
5907
        if 'mac' in this_nic_override:
5908
          mac = this_nic_override['mac']
5909
        else:
5910
          mac = nic.mac
5911
        if idx in self.nic_pnew:
5912
          nicparams = self.nic_pnew[idx]
5913
        else:
5914
          nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5915
        mode = nicparams[constants.NIC_MODE]
5916
        link = nicparams[constants.NIC_LINK]
5917
        args['nics'].append((ip, mac, mode, link))
5918
      if constants.DDM_ADD in nic_override:
5919
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5920
        mac = nic_override[constants.DDM_ADD]['mac']
5921
        nicparams = self.nic_pnew[constants.DDM_ADD]
5922
        mode = nicparams[constants.NIC_MODE]
5923
        link = nicparams[constants.NIC_LINK]
5924
        args['nics'].append((ip, mac, mode, link))
5925
      elif constants.DDM_REMOVE in nic_override:
5926
        del args['nics'][-1]
5927

    
5928
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5929
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5930
    return env, nl, nl
5931

    
5932
  def _GetUpdatedParams(self, old_params, update_dict,
5933
                        default_values, parameter_types):
5934
    """Return the new params dict for the given params.
5935

5936
    @type old_params: dict
5937
    @type old_params: old parameters
5938
    @type update_dict: dict
5939
    @type update_dict: dict containing new parameter values,
5940
                       or constants.VALUE_DEFAULT to reset the
5941
                       parameter to its default value
5942
    @type default_values: dict
5943
    @param default_values: default values for the filled parameters
5944
    @type parameter_types: dict
5945
    @param parameter_types: dict mapping target dict keys to types
5946
                            in constants.ENFORCEABLE_TYPES
5947
    @rtype: (dict, dict)
5948
    @return: (new_parameters, filled_parameters)
5949

5950
    """
5951
    params_copy = copy.deepcopy(old_params)
5952
    for key, val in update_dict.iteritems():
5953
      if val == constants.VALUE_DEFAULT:
5954
        try:
5955
          del params_copy[key]
5956
        except KeyError:
5957
          pass
5958
      else:
5959
        params_copy[key] = val
5960
    utils.ForceDictType(params_copy, parameter_types)
5961
    params_filled = objects.FillDict(default_values, params_copy)
5962
    return (params_copy, params_filled)
5963

    
5964
  def CheckPrereq(self):
5965
    """Check prerequisites.
5966

5967
    This only checks the instance list against the existing names.
5968

5969
    """
5970
    force = self.force = self.op.force
5971

    
5972
    # checking the new params on the primary/secondary nodes
5973

    
5974
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5975
    cluster = self.cluster = self.cfg.GetClusterInfo()
5976
    assert self.instance is not None, \
5977
      "Cannot retrieve locked instance %s" % self.op.instance_name
5978
    pnode = instance.primary_node
5979
    nodelist = list(instance.all_nodes)
5980

    
5981
    # hvparams processing
5982
    if self.op.hvparams:
5983
      i_hvdict, hv_new = self._GetUpdatedParams(
5984
                             instance.hvparams, self.op.hvparams,
5985
                             cluster.hvparams[instance.hypervisor],
5986
                             constants.HVS_PARAMETER_TYPES)
5987
      # local check
5988
      hypervisor.GetHypervisor(
5989
        instance.hypervisor).CheckParameterSyntax(hv_new)
5990
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5991
      self.hv_new = hv_new # the new actual values
5992
      self.hv_inst = i_hvdict # the new dict (without defaults)
5993
    else:
5994
      self.hv_new = self.hv_inst = {}
5995

    
5996
    # beparams processing
5997
    if self.op.beparams:
5998
      i_bedict, be_new = self._GetUpdatedParams(
5999
                             instance.beparams, self.op.beparams,
6000
                             cluster.beparams[constants.PP_DEFAULT],
6001
                             constants.BES_PARAMETER_TYPES)
6002
      self.be_new = be_new # the new actual values
6003
      self.be_inst = i_bedict # the new dict (without defaults)
6004
    else:
6005
      self.be_new = self.be_inst = {}
6006

    
6007
    self.warn = []
6008

    
6009
    if constants.BE_MEMORY in self.op.beparams and not self.force:
6010
      mem_check_list = [pnode]
6011
      if be_new[constants.BE_AUTO_BALANCE]:
6012
        # either we changed auto_balance to yes or it was from before
6013
        mem_check_list.extend(instance.secondary_nodes)
6014
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
6015
                                                  instance.hypervisor)
6016
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6017
                                         instance.hypervisor)
6018
      pninfo = nodeinfo[pnode]
6019
      msg = pninfo.fail_msg
6020
      if msg:
6021
        # Assume the primary node is unreachable and go ahead
6022
        self.warn.append("Can't get info from primary node %s: %s" %
6023
                         (pnode,  msg))
6024
      elif not isinstance(pninfo.payload.get('memory_free', None), int):
6025
        self.warn.append("Node data from primary node %s doesn't contain"
6026
                         " free memory information" % pnode)
6027
      elif instance_info.fail_msg:
6028
        self.warn.append("Can't get instance runtime information: %s" %
6029
                        instance_info.fail_msg)
6030
      else:
6031
        if instance_info.payload:
6032
          current_mem = int(instance_info.payload['memory'])
6033
        else:
6034
          # Assume instance not running
6035
          # (there is a slight race condition here, but it's not very probable,
6036
          # and we have no other way to check)
6037
          current_mem = 0
6038
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6039
                    pninfo.payload['memory_free'])
6040
        if miss_mem > 0:
6041
          raise errors.OpPrereqError("This change will prevent the instance"
6042
                                     " from starting, due to %d MB of memory"
6043
                                     " missing on its primary node" % miss_mem)
6044

    
6045
      if be_new[constants.BE_AUTO_BALANCE]:
6046
        for node, nres in nodeinfo.items():
6047
          if node not in instance.secondary_nodes:
6048
            continue
6049
          msg = nres.fail_msg
6050
          if msg:
6051
            self.warn.append("Can't get info from secondary node %s: %s" %
6052
                             (node, msg))
6053
          elif not isinstance(nres.payload.get('memory_free', None), int):
6054
            self.warn.append("Secondary node %s didn't return free"
6055
                             " memory information" % node)
6056
          elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6057
            self.warn.append("Not enough memory to failover instance to"
6058
                             " secondary node %s" % node)
6059

    
6060
    # NIC processing
6061
    self.nic_pnew = {}
6062
    self.nic_pinst = {}
6063
    for nic_op, nic_dict in self.op.nics:
6064
      if nic_op == constants.DDM_REMOVE:
6065
        if not instance.nics:
6066
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6067
        continue
6068
      if nic_op != constants.DDM_ADD:
6069
        # an existing nic
6070
        if nic_op < 0 or nic_op >= len(instance.nics):
6071
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6072
                                     " are 0 to %d" %
6073
                                     (nic_op, len(instance.nics)))
6074
        old_nic_params = instance.nics[nic_op].nicparams
6075
        old_nic_ip = instance.nics[nic_op].ip
6076
      else:
6077
        old_nic_params = {}
6078
        old_nic_ip = None
6079

    
6080
      update_params_dict = dict([(key, nic_dict[key])
6081
                                 for key in constants.NICS_PARAMETERS
6082
                                 if key in nic_dict])
6083

    
6084
      if 'bridge' in nic_dict:
6085
        update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6086

    
6087
      new_nic_params, new_filled_nic_params = \
6088
          self._GetUpdatedParams(old_nic_params, update_params_dict,
6089
                                 cluster.nicparams[constants.PP_DEFAULT],
6090
                                 constants.NICS_PARAMETER_TYPES)
6091
      objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6092
      self.nic_pinst[nic_op] = new_nic_params
6093
      self.nic_pnew[nic_op] = new_filled_nic_params
6094
      new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6095

    
6096
      if new_nic_mode == constants.NIC_MODE_BRIDGED:
6097
        nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6098
        msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6099
        if msg:
6100
          msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6101
          if self.force:
6102
            self.warn.append(msg)
6103
          else:
6104
            raise errors.OpPrereqError(msg)
6105
      if new_nic_mode == constants.NIC_MODE_ROUTED:
6106
        if 'ip' in nic_dict:
6107
          nic_ip = nic_dict['ip']
6108
        else:
6109
          nic_ip = old_nic_ip
6110
        if nic_ip is None:
6111
          raise errors.OpPrereqError('Cannot set the nic ip to None'
6112
                                     ' on a routed nic')
6113
      if 'mac' in nic_dict:
6114
        nic_mac = nic_dict['mac']
6115
        if nic_mac is None:
6116
          raise errors.OpPrereqError('Cannot set the nic mac to None')
6117
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6118
          # otherwise generate the mac
6119
          nic_dict['mac'] = self.cfg.GenerateMAC()
6120
        else:
6121
          # or validate/reserve the current one
6122
          if self.cfg.IsMacInUse(nic_mac):
6123
            raise errors.OpPrereqError("MAC address %s already in use"
6124
                                       " in cluster" % nic_mac)
6125

    
6126
    # DISK processing
6127
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6128
      raise errors.OpPrereqError("Disk operations not supported for"
6129
                                 " diskless instances")
6130
    for disk_op, disk_dict in self.op.disks:
6131
      if disk_op == constants.DDM_REMOVE:
6132
        if len(instance.disks) == 1:
6133
          raise errors.OpPrereqError("Cannot remove the last disk of"
6134
                                     " an instance")
6135
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6136
        ins_l = ins_l[pnode]
6137
        msg = ins_l.fail_msg
6138
        if msg:
6139
          raise errors.OpPrereqError("Can't contact node %s: %s" %
6140
                                     (pnode, msg))
6141
        if instance.name in ins_l.payload:
6142
          raise errors.OpPrereqError("Instance is running, can't remove"
6143
                                     " disks.")
6144

    
6145
      if (disk_op == constants.DDM_ADD and
6146
          len(instance.nics) >= constants.MAX_DISKS):
6147
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6148
                                   " add more" % constants.MAX_DISKS)
6149
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6150
        # an existing disk
6151
        if disk_op < 0 or disk_op >= len(instance.disks):
6152
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
6153
                                     " are 0 to %d" %
6154
                                     (disk_op, len(instance.disks)))
6155

    
6156
    return
6157

    
6158
  def Exec(self, feedback_fn):
6159
    """Modifies an instance.
6160

6161
    All parameters take effect only at the next restart of the instance.
6162

6163
    """
6164
    # Process here the warnings from CheckPrereq, as we don't have a
6165
    # feedback_fn there.
6166
    for warn in self.warn:
6167
      feedback_fn("WARNING: %s" % warn)
6168

    
6169
    result = []
6170
    instance = self.instance
6171
    cluster = self.cluster
6172
    # disk changes
6173
    for disk_op, disk_dict in self.op.disks:
6174
      if disk_op == constants.DDM_REMOVE:
6175
        # remove the last disk
6176
        device = instance.disks.pop()
6177
        device_idx = len(instance.disks)
6178
        for node, disk in device.ComputeNodeTree(instance.primary_node):
6179
          self.cfg.SetDiskID(disk, node)
6180
          msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6181
          if msg:
6182
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
6183
                            " continuing anyway", device_idx, node, msg)
6184
        result.append(("disk/%d" % device_idx, "remove"))
6185
      elif disk_op == constants.DDM_ADD:
6186
        # add a new disk
6187
        if instance.disk_template == constants.DT_FILE:
6188
          file_driver, file_path = instance.disks[0].logical_id
6189
          file_path = os.path.dirname(file_path)
6190
        else:
6191
          file_driver = file_path = None
6192
        disk_idx_base = len(instance.disks)
6193
        new_disk = _GenerateDiskTemplate(self,
6194
                                         instance.disk_template,
6195
                                         instance.name, instance.primary_node,
6196
                                         instance.secondary_nodes,
6197
                                         [disk_dict],
6198
                                         file_path,
6199
                                         file_driver,
6200
                                         disk_idx_base)[0]
6201
        instance.disks.append(new_disk)
6202
        info = _GetInstanceInfoText(instance)
6203

    
6204
        logging.info("Creating volume %s for instance %s",
6205
                     new_disk.iv_name, instance.name)
6206
        # Note: this needs to be kept in sync with _CreateDisks
6207
        #HARDCODE
6208
        for node in instance.all_nodes:
6209
          f_create = node == instance.primary_node
6210
          try:
6211
            _CreateBlockDev(self, node, instance, new_disk,
6212
                            f_create, info, f_create)
6213
          except errors.OpExecError, err:
6214
            self.LogWarning("Failed to create volume %s (%s) on"
6215
                            " node %s: %s",
6216
                            new_disk.iv_name, new_disk, node, err)
6217
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6218
                       (new_disk.size, new_disk.mode)))
6219
      else:
6220
        # change a given disk
6221
        instance.disks[disk_op].mode = disk_dict['mode']
6222
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6223
    # NIC changes
6224
    for nic_op, nic_dict in self.op.nics:
6225
      if nic_op == constants.DDM_REMOVE:
6226
        # remove the last nic
6227
        del instance.nics[-1]
6228
        result.append(("nic.%d" % len(instance.nics), "remove"))
6229
      elif nic_op == constants.DDM_ADD:
6230
        # mac and bridge should be set, by now
6231
        mac = nic_dict['mac']
6232
        ip = nic_dict.get('ip', None)
6233
        nicparams = self.nic_pinst[constants.DDM_ADD]
6234
        new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6235
        instance.nics.append(new_nic)
6236
        result.append(("nic.%d" % (len(instance.nics) - 1),
6237
                       "add:mac=%s,ip=%s,mode=%s,link=%s" %
6238
                       (new_nic.mac, new_nic.ip,
6239
                        self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6240
                        self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6241
                       )))
6242
      else:
6243
        for key in 'mac', 'ip':
6244
          if key in nic_dict:
6245
            setattr(instance.nics[nic_op], key, nic_dict[key])
6246
        if nic_op in self.nic_pnew:
6247
          instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6248
        for key, val in nic_dict.iteritems():
6249
          result.append(("nic.%s/%d" % (key, nic_op), val))
6250

    
6251
    # hvparams changes
6252
    if self.op.hvparams:
6253
      instance.hvparams = self.hv_inst
6254
      for key, val in self.op.hvparams.iteritems():
6255
        result.append(("hv/%s" % key, val))
6256

    
6257
    # beparams changes
6258
    if self.op.beparams:
6259
      instance.beparams = self.be_inst
6260
      for key, val in self.op.beparams.iteritems():
6261
        result.append(("be/%s" % key, val))
6262

    
6263
    self.cfg.Update(instance)
6264

    
6265
    return result
6266

    
6267

    
6268
class LUQueryExports(NoHooksLU):
6269
  """Query the exports list
6270

6271
  """
6272
  _OP_REQP = ['nodes']
6273
  REQ_BGL = False
6274

    
6275
  def ExpandNames(self):
6276
    self.needed_locks = {}
6277
    self.share_locks[locking.LEVEL_NODE] = 1
6278
    if not self.op.nodes:
6279
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6280
    else:
6281
      self.needed_locks[locking.LEVEL_NODE] = \
6282
        _GetWantedNodes(self, self.op.nodes)
6283

    
6284
  def CheckPrereq(self):
6285
    """Check prerequisites.
6286

6287
    """
6288
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6289

    
6290
  def Exec(self, feedback_fn):
6291
    """Compute the list of all the exported system images.
6292

6293
    @rtype: dict
6294
    @return: a dictionary with the structure node->(export-list)
6295
        where export-list is a list of the instances exported on
6296
        that node.
6297

6298
    """
6299
    rpcresult = self.rpc.call_export_list(self.nodes)
6300
    result = {}
6301
    for node in rpcresult:
6302
      if rpcresult[node].fail_msg:
6303
        result[node] = False
6304
      else:
6305
        result[node] = rpcresult[node].payload
6306

    
6307
    return result
6308

    
6309

    
6310
class LUExportInstance(LogicalUnit):
6311
  """Export an instance to an image in the cluster.
6312

6313
  """
6314
  HPATH = "instance-export"
6315
  HTYPE = constants.HTYPE_INSTANCE
6316
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6317
  REQ_BGL = False
6318

    
6319
  def ExpandNames(self):
6320
    self._ExpandAndLockInstance()
6321
    # FIXME: lock only instance primary and destination node
6322
    #
6323
    # Sad but true, for now we have do lock all nodes, as we don't know where
6324
    # the previous export might be, and and in this LU we search for it and
6325
    # remove it from its current node. In the future we could fix this by:
6326
    #  - making a tasklet to search (share-lock all), then create the new one,
6327
    #    then one to remove, after
6328
    #  - removing the removal operation altoghether
6329
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6330

    
6331
  def DeclareLocks(self, level):
6332
    """Last minute lock declaration."""
6333
    # All nodes are locked anyway, so nothing to do here.
6334

    
6335
  def BuildHooksEnv(self):
6336
    """Build hooks env.
6337

6338
    This will run on the master, primary node and target node.
6339

6340
    """
6341
    env = {
6342
      "EXPORT_NODE": self.op.target_node,
6343
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6344
      }
6345
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6346
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6347
          self.op.target_node]
6348
    return env, nl, nl
6349

    
6350
  def CheckPrereq(self):
6351
    """Check prerequisites.
6352

6353
    This checks that the instance and node names are valid.
6354

6355
    """
6356
    instance_name = self.op.instance_name
6357
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6358
    assert self.instance is not None, \
6359
          "Cannot retrieve locked instance %s" % self.op.instance_name
6360
    _CheckNodeOnline(self, self.instance.primary_node)
6361

    
6362
    self.dst_node = self.cfg.GetNodeInfo(
6363
      self.cfg.ExpandNodeName(self.op.target_node))
6364

    
6365
    if self.dst_node is None:
6366
      # This is wrong node name, not a non-locked node
6367
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6368
    _CheckNodeOnline(self, self.dst_node.name)
6369
    _CheckNodeNotDrained(self, self.dst_node.name)
6370

    
6371
    # instance disk type verification
6372
    for disk in self.instance.disks:
6373
      if disk.dev_type == constants.LD_FILE:
6374
        raise errors.OpPrereqError("Export not supported for instances with"
6375
                                   " file-based disks")
6376

    
6377
  def Exec(self, feedback_fn):
6378
    """Export an instance to an image in the cluster.
6379

6380
    """
6381
    instance = self.instance
6382
    dst_node = self.dst_node
6383
    src_node = instance.primary_node
6384
    if self.op.shutdown:
6385
      # shutdown the instance, but not the disks
6386
      result = self.rpc.call_instance_shutdown(src_node, instance)
6387
      result.Raise("Could not shutdown instance %s on"
6388
                   " node %s" % (instance.name, src_node))
6389

    
6390
    vgname = self.cfg.GetVGName()
6391

    
6392
    snap_disks = []
6393

    
6394
    # set the disks ID correctly since call_instance_start needs the
6395
    # correct drbd minor to create the symlinks
6396
    for disk in instance.disks:
6397
      self.cfg.SetDiskID(disk, src_node)
6398

    
6399
    try:
6400
      for disk in instance.disks:
6401
        # result.payload will be a snapshot of an lvm leaf of the one we passed
6402
        result = self.rpc.call_blockdev_snapshot(src_node, disk)
6403
        msg = result.fail_msg
6404
        if msg:
6405
          self.LogWarning("Could not snapshot block device %s on node %s: %s",
6406
                          disk.logical_id[1], src_node, msg)
6407
          snap_disks.append(False)
6408
        else:
6409
          disk_id = (vgname, result.payload)
6410
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6411
                                 logical_id=disk_id, physical_id=disk_id,
6412
                                 iv_name=disk.iv_name)
6413
          snap_disks.append(new_dev)
6414

    
6415
    finally:
6416
      if self.op.shutdown and instance.admin_up:
6417
        result = self.rpc.call_instance_start(src_node, instance, None, None)
6418
        msg = result.fail_msg
6419
        if msg:
6420
          _ShutdownInstanceDisks(self, instance)
6421
          raise errors.OpExecError("Could not start instance: %s" % msg)
6422

    
6423
    # TODO: check for size
6424

    
6425
    cluster_name = self.cfg.GetClusterName()
6426
    for idx, dev in enumerate(snap_disks):
6427
      if dev:
6428
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6429
                                               instance, cluster_name, idx)
6430
        msg = result.fail_msg
6431
        if msg:
6432
          self.LogWarning("Could not export block device %s from node %s to"
6433
                          " node %s: %s", dev.logical_id[1], src_node,
6434
                          dst_node.name, msg)
6435
        msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6436
        if msg:
6437
          self.LogWarning("Could not remove snapshot block device %s from node"
6438
                          " %s: %s", dev.logical_id[1], src_node, msg)
6439

    
6440
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6441
    msg = result.fail_msg
6442
    if msg:
6443
      self.LogWarning("Could not finalize export for instance %s"
6444
                      " on node %s: %s", instance.name, dst_node.name, msg)
6445

    
6446
    nodelist = self.cfg.GetNodeList()
6447
    nodelist.remove(dst_node.name)
6448

    
6449
    # on one-node clusters nodelist will be empty after the removal
6450
    # if we proceed the backup would be removed because OpQueryExports
6451
    # substitutes an empty list with the full cluster node list.
6452
    iname = instance.name
6453
    if nodelist:
6454
      exportlist = self.rpc.call_export_list(nodelist)
6455
      for node in exportlist:
6456
        if exportlist[node].fail_msg:
6457
          continue
6458
        if iname in exportlist[node].payload:
6459
          msg = self.rpc.call_export_remove(node, iname).fail_msg
6460
          if msg:
6461
            self.LogWarning("Could not remove older export for instance %s"
6462
                            " on node %s: %s", iname, node, msg)
6463

    
6464

    
6465
class LURemoveExport(NoHooksLU):
6466
  """Remove exports related to the named instance.
6467

6468
  """
6469
  _OP_REQP = ["instance_name"]
6470
  REQ_BGL = False
6471

    
6472
  def ExpandNames(self):
6473
    self.needed_locks = {}
6474
    # We need all nodes to be locked in order for RemoveExport to work, but we
6475
    # don't need to lock the instance itself, as nothing will happen to it (and
6476
    # we can remove exports also for a removed instance)
6477
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6478

    
6479
  def CheckPrereq(self):
6480
    """Check prerequisites.
6481
    """
6482
    pass
6483

    
6484
  def Exec(self, feedback_fn):
6485
    """Remove any export.
6486

6487
    """
6488
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6489
    # If the instance was not found we'll try with the name that was passed in.
6490
    # This will only work if it was an FQDN, though.
6491
    fqdn_warn = False
6492
    if not instance_name:
6493
      fqdn_warn = True
6494
      instance_name = self.op.instance_name
6495

    
6496
    locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6497
    exportlist = self.rpc.call_export_list(locked_nodes)
6498
    found = False
6499
    for node in exportlist:
6500
      msg = exportlist[node].fail_msg
6501
      if msg:
6502
        self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6503
        continue
6504
      if instance_name in exportlist[node].payload:
6505
        found = True
6506
        result = self.rpc.call_export_remove(node, instance_name)
6507
        msg = result.fail_msg
6508
        if msg:
6509
          logging.error("Could not remove export for instance %s"
6510
                        " on node %s: %s", instance_name, node, msg)
6511

    
6512
    if fqdn_warn and not found:
6513
      feedback_fn("Export not found. If trying to remove an export belonging"
6514
                  " to a deleted instance please use its Fully Qualified"
6515
                  " Domain Name.")
6516

    
6517

    
6518
class TagsLU(NoHooksLU):
6519
  """Generic tags LU.
6520

6521
  This is an abstract class which is the parent of all the other tags LUs.
6522

6523
  """
6524

    
6525
  def ExpandNames(self):
6526
    self.needed_locks = {}
6527
    if self.op.kind == constants.TAG_NODE:
6528
      name = self.cfg.ExpandNodeName(self.op.name)
6529
      if name is None:
6530
        raise errors.OpPrereqError("Invalid node name (%s)" %
6531
                                   (self.op.name,))
6532
      self.op.name = name
6533
      self.needed_locks[locking.LEVEL_NODE] = name
6534
    elif self.op.kind == constants.TAG_INSTANCE:
6535
      name = self.cfg.ExpandInstanceName(self.op.name)
6536
      if name is None:
6537
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6538
                                   (self.op.name,))
6539
      self.op.name = name
6540
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6541

    
6542
  def CheckPrereq(self):
6543
    """Check prerequisites.
6544

6545
    """
6546
    if self.op.kind == constants.TAG_CLUSTER:
6547
      self.target = self.cfg.GetClusterInfo()
6548
    elif self.op.kind == constants.TAG_NODE:
6549
      self.target = self.cfg.GetNodeInfo(self.op.name)
6550
    elif self.op.kind == constants.TAG_INSTANCE:
6551
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6552
    else:
6553
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6554
                                 str(self.op.kind))
6555

    
6556

    
6557
class LUGetTags(TagsLU):
6558
  """Returns the tags of a given object.
6559

6560
  """
6561
  _OP_REQP = ["kind", "name"]
6562
  REQ_BGL = False
6563

    
6564
  def Exec(self, feedback_fn):
6565
    """Returns the tag list.
6566

6567
    """
6568
    return list(self.target.GetTags())
6569

    
6570

    
6571
class LUSearchTags(NoHooksLU):
6572
  """Searches the tags for a given pattern.
6573

6574
  """
6575
  _OP_REQP = ["pattern"]
6576
  REQ_BGL = False
6577

    
6578
  def ExpandNames(self):
6579
    self.needed_locks = {}
6580

    
6581
  def CheckPrereq(self):
6582
    """Check prerequisites.
6583

6584
    This checks the pattern passed for validity by compiling it.
6585

6586
    """
6587
    try:
6588
      self.re = re.compile(self.op.pattern)
6589
    except re.error, err:
6590
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6591
                                 (self.op.pattern, err))
6592

    
6593
  def Exec(self, feedback_fn):
6594
    """Returns the tag list.
6595

6596
    """
6597
    cfg = self.cfg
6598
    tgts = [("/cluster", cfg.GetClusterInfo())]
6599
    ilist = cfg.GetAllInstancesInfo().values()
6600
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6601
    nlist = cfg.GetAllNodesInfo().values()
6602
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6603
    results = []
6604
    for path, target in tgts:
6605
      for tag in target.GetTags():
6606
        if self.re.search(tag):
6607
          results.append((path, tag))
6608
    return results
6609

    
6610

    
6611
class LUAddTags(TagsLU):
6612
  """Sets a tag on a given object.
6613

6614
  """
6615
  _OP_REQP = ["kind", "name", "tags"]
6616
  REQ_BGL = False
6617

    
6618
  def CheckPrereq(self):
6619
    """Check prerequisites.
6620

6621
    This checks the type and length of the tag name and value.
6622

6623
    """
6624
    TagsLU.CheckPrereq(self)
6625
    for tag in self.op.tags:
6626
      objects.TaggableObject.ValidateTag(tag)
6627

    
6628
  def Exec(self, feedback_fn):
6629
    """Sets the tag.
6630

6631
    """
6632
    try:
6633
      for tag in self.op.tags:
6634
        self.target.AddTag(tag)
6635
    except errors.TagError, err:
6636
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6637
    try:
6638
      self.cfg.Update(self.target)
6639
    except errors.ConfigurationError:
6640
      raise errors.OpRetryError("There has been a modification to the"
6641
                                " config file and the operation has been"
6642
                                " aborted. Please retry.")
6643

    
6644

    
6645
class LUDelTags(TagsLU):
6646
  """Delete a list of tags from a given object.
6647

6648
  """
6649
  _OP_REQP = ["kind", "name", "tags"]
6650
  REQ_BGL = False
6651

    
6652
  def CheckPrereq(self):
6653
    """Check prerequisites.
6654

6655
    This checks that we have the given tag.
6656

6657
    """
6658
    TagsLU.CheckPrereq(self)
6659
    for tag in self.op.tags:
6660
      objects.TaggableObject.ValidateTag(tag)
6661
    del_tags = frozenset(self.op.tags)
6662
    cur_tags = self.target.GetTags()
6663
    if not del_tags <= cur_tags:
6664
      diff_tags = del_tags - cur_tags
6665
      diff_names = ["'%s'" % tag for tag in diff_tags]
6666
      diff_names.sort()
6667
      raise errors.OpPrereqError("Tag(s) %s not found" %
6668
                                 (",".join(diff_names)))
6669

    
6670
  def Exec(self, feedback_fn):
6671
    """Remove the tag from the object.
6672

6673
    """
6674
    for tag in self.op.tags:
6675
      self.target.RemoveTag(tag)
6676
    try:
6677
      self.cfg.Update(self.target)
6678
    except errors.ConfigurationError:
6679
      raise errors.OpRetryError("There has been a modification to the"
6680
                                " config file and the operation has been"
6681
                                " aborted. Please retry.")
6682

    
6683

    
6684
class LUTestDelay(NoHooksLU):
6685
  """Sleep for a specified amount of time.
6686

6687
  This LU sleeps on the master and/or nodes for a specified amount of
6688
  time.
6689

6690
  """
6691
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6692
  REQ_BGL = False
6693

    
6694
  def ExpandNames(self):
6695
    """Expand names and set required locks.
6696

6697
    This expands the node list, if any.
6698

6699
    """
6700
    self.needed_locks = {}
6701
    if self.op.on_nodes:
6702
      # _GetWantedNodes can be used here, but is not always appropriate to use
6703
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6704
      # more information.
6705
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6706
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6707

    
6708
  def CheckPrereq(self):
6709
    """Check prerequisites.
6710

6711
    """
6712

    
6713
  def Exec(self, feedback_fn):
6714
    """Do the actual sleep.
6715

6716
    """
6717
    if self.op.on_master:
6718
      if not utils.TestDelay(self.op.duration):
6719
        raise errors.OpExecError("Error during master delay test")
6720
    if self.op.on_nodes:
6721
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6722
      for node, node_result in result.items():
6723
        node_result.Raise("Failure during rpc call to node %s" % node)
6724

    
6725

    
6726
class IAllocator(object):
6727
  """IAllocator framework.
6728

6729
  An IAllocator instance has three sets of attributes:
6730
    - cfg that is needed to query the cluster
6731
    - input data (all members of the _KEYS class attribute are required)
6732
    - four buffer attributes (in|out_data|text), that represent the
6733
      input (to the external script) in text and data structure format,
6734
      and the output from it, again in two formats
6735
    - the result variables from the script (success, info, nodes) for
6736
      easy usage
6737

6738
  """
6739
  _ALLO_KEYS = [
6740
    "mem_size", "disks", "disk_template",
6741
    "os", "tags", "nics", "vcpus", "hypervisor",
6742
    ]
6743
  _RELO_KEYS = [
6744
    "relocate_from",
6745
    ]
6746

    
6747
  def __init__(self, lu, mode, name, **kwargs):
6748
    self.lu = lu
6749
    # init buffer variables
6750
    self.in_text = self.out_text = self.in_data = self.out_data = None
6751
    # init all input fields so that pylint is happy
6752
    self.mode = mode
6753
    self.name = name
6754
    self.mem_size = self.disks = self.disk_template = None
6755
    self.os = self.tags = self.nics = self.vcpus = None
6756
    self.hypervisor = None
6757
    self.relocate_from = None
6758
    # computed fields
6759
    self.required_nodes = None
6760
    # init result fields
6761
    self.success = self.info = self.nodes = None
6762
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6763
      keyset = self._ALLO_KEYS
6764
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6765
      keyset = self._RELO_KEYS
6766
    else:
6767
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6768
                                   " IAllocator" % self.mode)
6769
    for key in kwargs:
6770
      if key not in keyset:
6771
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6772
                                     " IAllocator" % key)
6773
      setattr(self, key, kwargs[key])
6774
    for key in keyset:
6775
      if key not in kwargs:
6776
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6777
                                     " IAllocator" % key)
6778
    self._BuildInputData()
6779

    
6780
  def _ComputeClusterData(self):
6781
    """Compute the generic allocator input data.
6782

6783
    This is the data that is independent of the actual operation.
6784

6785
    """
6786
    cfg = self.lu.cfg
6787
    cluster_info = cfg.GetClusterInfo()
6788
    # cluster data
6789
    data = {
6790
      "version": constants.IALLOCATOR_VERSION,
6791
      "cluster_name": cfg.GetClusterName(),
6792
      "cluster_tags": list(cluster_info.GetTags()),
6793
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6794
      # we don't have job IDs
6795
      }
6796
    iinfo = cfg.GetAllInstancesInfo().values()
6797
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6798

    
6799
    # node data
6800
    node_results = {}
6801
    node_list = cfg.GetNodeList()
6802

    
6803
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6804
      hypervisor_name = self.hypervisor
6805
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6806
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6807

    
6808
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6809
                                           hypervisor_name)
6810
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6811
                       cluster_info.enabled_hypervisors)
6812
    for nname, nresult in node_data.items():
6813
      # first fill in static (config-based) values
6814
      ninfo = cfg.GetNodeInfo(nname)
6815
      pnr = {
6816
        "tags": list(ninfo.GetTags()),
6817
        "primary_ip": ninfo.primary_ip,
6818
        "secondary_ip": ninfo.secondary_ip,
6819
        "offline": ninfo.offline,
6820
        "drained": ninfo.drained,
6821
        "master_candidate": ninfo.master_candidate,
6822
        }
6823

    
6824
      if not ninfo.offline:
6825
        nresult.Raise("Can't get data for node %s" % nname)
6826
        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6827
                                nname)
6828
        remote_info = nresult.payload
6829
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6830
                     'vg_size', 'vg_free', 'cpu_total']:
6831
          if attr not in remote_info:
6832
            raise errors.OpExecError("Node '%s' didn't return attribute"
6833
                                     " '%s'" % (nname, attr))
6834
          if not isinstance(remote_info[attr], int):
6835
            raise errors.OpExecError("Node '%s' returned invalid value"
6836
                                     " for '%s': %s" %
6837
                                     (nname, attr, remote_info[attr]))
6838
        # compute memory used by primary instances
6839
        i_p_mem = i_p_up_mem = 0
6840
        for iinfo, beinfo in i_list:
6841
          if iinfo.primary_node == nname:
6842
            i_p_mem += beinfo[constants.BE_MEMORY]
6843
            if iinfo.name not in node_iinfo[nname].payload:
6844
              i_used_mem = 0
6845
            else:
6846
              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6847
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6848
            remote_info['memory_free'] -= max(0, i_mem_diff)
6849

    
6850
            if iinfo.admin_up:
6851
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6852

    
6853
        # compute memory used by instances
6854
        pnr_dyn = {
6855
          "total_memory": remote_info['memory_total'],
6856
          "reserved_memory": remote_info['memory_dom0'],
6857
          "free_memory": remote_info['memory_free'],
6858
          "total_disk": remote_info['vg_size'],
6859
          "free_disk": remote_info['vg_free'],
6860
          "total_cpus": remote_info['cpu_total'],
6861
          "i_pri_memory": i_p_mem,
6862
          "i_pri_up_memory": i_p_up_mem,
6863
          }
6864
        pnr.update(pnr_dyn)
6865

    
6866
      node_results[nname] = pnr
6867
    data["nodes"] = node_results
6868

    
6869
    # instance data
6870
    instance_data = {}
6871
    for iinfo, beinfo in i_list:
6872
      nic_data = []
6873
      for nic in iinfo.nics:
6874
        filled_params = objects.FillDict(
6875
            cluster_info.nicparams[constants.PP_DEFAULT],
6876
            nic.nicparams)
6877
        nic_dict = {"mac": nic.mac,
6878
                    "ip": nic.ip,
6879
                    "mode": filled_params[constants.NIC_MODE],
6880
                    "link": filled_params[constants.NIC_LINK],
6881
                   }
6882
        if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6883
          nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6884
        nic_data.append(nic_dict)
6885
      pir = {
6886
        "tags": list(iinfo.GetTags()),
6887
        "admin_up": iinfo.admin_up,
6888
        "vcpus": beinfo[constants.BE_VCPUS],
6889
        "memory": beinfo[constants.BE_MEMORY],
6890
        "os": iinfo.os,
6891
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6892
        "nics": nic_data,
6893
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6894
        "disk_template": iinfo.disk_template,
6895
        "hypervisor": iinfo.hypervisor,
6896
        }
6897
      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6898
                                                 pir["disks"])
6899
      instance_data[iinfo.name] = pir
6900

    
6901
    data["instances"] = instance_data
6902

    
6903
    self.in_data = data
6904

    
6905
  def _AddNewInstance(self):
6906
    """Add new instance data to allocator structure.
6907

6908
    This in combination with _AllocatorGetClusterData will create the
6909
    correct structure needed as input for the allocator.
6910

6911
    The checks for the completeness of the opcode must have already been
6912
    done.
6913

6914
    """
6915
    data = self.in_data
6916

    
6917
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6918

    
6919
    if self.disk_template in constants.DTS_NET_MIRROR:
6920
      self.required_nodes = 2
6921
    else:
6922
      self.required_nodes = 1
6923
    request = {
6924
      "type": "allocate",
6925
      "name": self.name,
6926
      "disk_template": self.disk_template,
6927
      "tags": self.tags,
6928
      "os": self.os,
6929
      "vcpus": self.vcpus,
6930
      "memory": self.mem_size,
6931
      "disks": self.disks,
6932
      "disk_space_total": disk_space,
6933
      "nics": self.nics,
6934
      "required_nodes": self.required_nodes,
6935
      }
6936
    data["request"] = request
6937

    
6938
  def _AddRelocateInstance(self):
6939
    """Add relocate instance data to allocator structure.
6940

6941
    This in combination with _IAllocatorGetClusterData will create the
6942
    correct structure needed as input for the allocator.
6943

6944
    The checks for the completeness of the opcode must have already been
6945
    done.
6946

6947
    """
6948
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6949
    if instance is None:
6950
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6951
                                   " IAllocator" % self.name)
6952

    
6953
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6954
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6955

    
6956
    if len(instance.secondary_nodes) != 1:
6957
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6958

    
6959
    self.required_nodes = 1
6960
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6961
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6962

    
6963
    request = {
6964
      "type": "relocate",
6965
      "name": self.name,
6966
      "disk_space_total": disk_space,
6967
      "required_nodes": self.required_nodes,
6968
      "relocate_from": self.relocate_from,
6969
      }
6970
    self.in_data["request"] = request
6971

    
6972
  def _BuildInputData(self):
6973
    """Build input data structures.
6974

6975
    """
6976
    self._ComputeClusterData()
6977

    
6978
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6979
      self._AddNewInstance()
6980
    else:
6981
      self._AddRelocateInstance()
6982

    
6983
    self.in_text = serializer.Dump(self.in_data)
6984

    
6985
  def Run(self, name, validate=True, call_fn=None):
6986
    """Run an instance allocator and return the results.
6987

6988
    """
6989
    if call_fn is None:
6990
      call_fn = self.lu.rpc.call_iallocator_runner
6991
    data = self.in_text
6992

    
6993
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6994
    result.Raise("Failure while running the iallocator script")
6995

    
6996
    self.out_text = result.payload
6997
    if validate:
6998
      self._ValidateResult()
6999

    
7000
  def _ValidateResult(self):
7001
    """Process the allocator results.
7002

7003
    This will process and if successful save the result in
7004
    self.out_data and the other parameters.
7005

7006
    """
7007
    try:
7008
      rdict = serializer.Load(self.out_text)
7009
    except Exception, err:
7010
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7011

    
7012
    if not isinstance(rdict, dict):
7013
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
7014

    
7015
    for key in "success", "info", "nodes":
7016
      if key not in rdict:
7017
        raise errors.OpExecError("Can't parse iallocator results:"
7018
                                 " missing key '%s'" % key)
7019
      setattr(self, key, rdict[key])
7020

    
7021
    if not isinstance(rdict["nodes"], list):
7022
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7023
                               " is not a list")
7024
    self.out_data = rdict
7025

    
7026

    
7027
class LUTestAllocator(NoHooksLU):
7028
  """Run allocator tests.
7029

7030
  This LU runs the allocator tests
7031

7032
  """
7033
  _OP_REQP = ["direction", "mode", "name"]
7034

    
7035
  def CheckPrereq(self):
7036
    """Check prerequisites.
7037

7038
    This checks the opcode parameters depending on the director and mode test.
7039

7040
    """
7041
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7042
      for attr in ["name", "mem_size", "disks", "disk_template",
7043
                   "os", "tags", "nics", "vcpus"]:
7044
        if not hasattr(self.op, attr):
7045
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7046
                                     attr)
7047
      iname = self.cfg.ExpandInstanceName(self.op.name)
7048
      if iname is not None:
7049
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7050
                                   iname)
7051
      if not isinstance(self.op.nics, list):
7052
        raise errors.OpPrereqError("Invalid parameter 'nics'")
7053
      for row in self.op.nics:
7054
        if (not isinstance(row, dict) or
7055
            "mac" not in row or
7056
            "ip" not in row or
7057
            "bridge" not in row):
7058
          raise errors.OpPrereqError("Invalid contents of the"
7059
                                     " 'nics' parameter")
7060
      if not isinstance(self.op.disks, list):
7061
        raise errors.OpPrereqError("Invalid parameter 'disks'")
7062
      for row in self.op.disks:
7063
        if (not isinstance(row, dict) or
7064
            "size" not in row or
7065
            not isinstance(row["size"], int) or
7066
            "mode" not in row or
7067
            row["mode"] not in ['r', 'w']):
7068
          raise errors.OpPrereqError("Invalid contents of the"
7069
                                     " 'disks' parameter")
7070
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7071
        self.op.hypervisor = self.cfg.GetHypervisorType()
7072
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7073
      if not hasattr(self.op, "name"):
7074
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7075
      fname = self.cfg.ExpandInstanceName(self.op.name)
7076
      if fname is None:
7077
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7078
                                   self.op.name)
7079
      self.op.name = fname
7080
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7081
    else:
7082
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7083
                                 self.op.mode)
7084

    
7085
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7086
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
7087
        raise errors.OpPrereqError("Missing allocator name")
7088
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7089
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
7090
                                 self.op.direction)
7091

    
7092
  def Exec(self, feedback_fn):
7093
    """Run the allocator test.
7094

7095
    """
7096
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7097
      ial = IAllocator(self,
7098
                       mode=self.op.mode,
7099
                       name=self.op.name,
7100
                       mem_size=self.op.mem_size,
7101
                       disks=self.op.disks,
7102
                       disk_template=self.op.disk_template,
7103
                       os=self.op.os,
7104
                       tags=self.op.tags,
7105
                       nics=self.op.nics,
7106
                       vcpus=self.op.vcpus,
7107
                       hypervisor=self.op.hypervisor,
7108
                       )
7109
    else:
7110
      ial = IAllocator(self,
7111
                       mode=self.op.mode,
7112
                       name=self.op.name,
7113
                       relocate_from=list(self.relocate_from),
7114
                       )
7115

    
7116
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
7117
      result = ial.in_text
7118
    else:
7119
      ial.Run(self.op.allocator, validate=False)
7120
      result = ial.out_text
7121
    return result