Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib.py @ 9ebe9556

History | View | Annotate | Download (249.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the master-side code."""
23

    
24
# pylint: disable-msg=W0613,W0201
25

    
26
import os
27
import os.path
28
import time
29
import tempfile
30
import re
31
import platform
32
import logging
33
import copy
34
import random
35

    
36
from ganeti import ssh
37
from ganeti import utils
38
from ganeti import errors
39
from ganeti import hypervisor
40
from ganeti import locking
41
from ganeti import constants
42
from ganeti import objects
43
from ganeti import opcodes
44
from ganeti import serializer
45
from ganeti import ssconf
46

    
47

    
48
class LogicalUnit(object):
49
  """Logical Unit base class.
50

51
  Subclasses must follow these rules:
52
    - implement ExpandNames
53
    - implement CheckPrereq
54
    - implement Exec
55
    - implement BuildHooksEnv
56
    - redefine HPATH and HTYPE
57
    - optionally redefine their run requirements:
58
        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59

60
  Note that all commands require root permissions.
61

62
  """
63
  HPATH = None
64
  HTYPE = None
65
  _OP_REQP = []
66
  REQ_BGL = True
67

    
68
  def __init__(self, processor, op, context, rpc):
69
    """Constructor for LogicalUnit.
70

71
    This needs to be overriden in derived classes in order to check op
72
    validity.
73

74
    """
75
    self.proc = processor
76
    self.op = op
77
    self.cfg = context.cfg
78
    self.context = context
79
    self.rpc = rpc
80
    # Dicts used to declare locking needs to mcpu
81
    self.needed_locks = None
82
    self.acquired_locks = {}
83
    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84
    self.add_locks = {}
85
    self.remove_locks = {}
86
    # Used to force good behavior when calling helper functions
87
    self.recalculate_locks = {}
88
    self.__ssh = None
89
    # logging
90
    self.LogWarning = processor.LogWarning
91
    self.LogInfo = processor.LogInfo
92

    
93
    for attr_name in self._OP_REQP:
94
      attr_val = getattr(op, attr_name, None)
95
      if attr_val is None:
96
        raise errors.OpPrereqError("Required parameter '%s' missing" %
97
                                   attr_name)
98
    self.CheckArguments()
99

    
100
  def __GetSSH(self):
101
    """Returns the SshRunner object
102

103
    """
104
    if not self.__ssh:
105
      self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106
    return self.__ssh
107

    
108
  ssh = property(fget=__GetSSH)
109

    
110
  def CheckArguments(self):
111
    """Check syntactic validity for the opcode arguments.
112

113
    This method is for doing a simple syntactic check and ensure
114
    validity of opcode parameters, without any cluster-related
115
    checks. While the same can be accomplished in ExpandNames and/or
116
    CheckPrereq, doing these separate is better because:
117

118
      - ExpandNames is left as as purely a lock-related function
119
      - CheckPrereq is run after we have aquired locks (and possible
120
        waited for them)
121

122
    The function is allowed to change the self.op attribute so that
123
    later methods can no longer worry about missing parameters.
124

125
    """
126
    pass
127

    
128
  def ExpandNames(self):
129
    """Expand names for this LU.
130

131
    This method is called before starting to execute the opcode, and it should
132
    update all the parameters of the opcode to their canonical form (e.g. a
133
    short node name must be fully expanded after this method has successfully
134
    completed). This way locking, hooks, logging, ecc. can work correctly.
135

136
    LUs which implement this method must also populate the self.needed_locks
137
    member, as a dict with lock levels as keys, and a list of needed lock names
138
    as values. Rules:
139

140
      - use an empty dict if you don't need any lock
141
      - if you don't need any lock at a particular level omit that level
142
      - don't put anything for the BGL level
143
      - if you want all locks at a level use locking.ALL_SET as a value
144

145
    If you need to share locks (rather than acquire them exclusively) at one
146
    level you can modify self.share_locks, setting a true value (usually 1) for
147
    that level. By default locks are not shared.
148

149
    Examples::
150

151
      # Acquire all nodes and one instance
152
      self.needed_locks = {
153
        locking.LEVEL_NODE: locking.ALL_SET,
154
        locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155
      }
156
      # Acquire just two nodes
157
      self.needed_locks = {
158
        locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159
      }
160
      # Acquire no locks
161
      self.needed_locks = {} # No, you can't leave it to the default value None
162

163
    """
164
    # The implementation of this method is mandatory only if the new LU is
165
    # concurrent, so that old LUs don't need to be changed all at the same
166
    # time.
167
    if self.REQ_BGL:
168
      self.needed_locks = {} # Exclusive LUs don't need locks.
169
    else:
170
      raise NotImplementedError
171

    
172
  def DeclareLocks(self, level):
173
    """Declare LU locking needs for a level
174

175
    While most LUs can just declare their locking needs at ExpandNames time,
176
    sometimes there's the need to calculate some locks after having acquired
177
    the ones before. This function is called just before acquiring locks at a
178
    particular level, but after acquiring the ones at lower levels, and permits
179
    such calculations. It can be used to modify self.needed_locks, and by
180
    default it does nothing.
181

182
    This function is only called if you have something already set in
183
    self.needed_locks for the level.
184

185
    @param level: Locking level which is going to be locked
186
    @type level: member of ganeti.locking.LEVELS
187

188
    """
189

    
190
  def CheckPrereq(self):
191
    """Check prerequisites for this LU.
192

193
    This method should check that the prerequisites for the execution
194
    of this LU are fulfilled. It can do internode communication, but
195
    it should be idempotent - no cluster or system changes are
196
    allowed.
197

198
    The method should raise errors.OpPrereqError in case something is
199
    not fulfilled. Its return value is ignored.
200

201
    This method should also update all the parameters of the opcode to
202
    their canonical form if it hasn't been done by ExpandNames before.
203

204
    """
205
    raise NotImplementedError
206

    
207
  def Exec(self, feedback_fn):
208
    """Execute the LU.
209

210
    This method should implement the actual work. It should raise
211
    errors.OpExecError for failures that are somewhat dealt with in
212
    code, or expected.
213

214
    """
215
    raise NotImplementedError
216

    
217
  def BuildHooksEnv(self):
218
    """Build hooks environment for this LU.
219

220
    This method should return a three-node tuple consisting of: a dict
221
    containing the environment that will be used for running the
222
    specific hook for this LU, a list of node names on which the hook
223
    should run before the execution, and a list of node names on which
224
    the hook should run after the execution.
225

226
    The keys of the dict must not have 'GANETI_' prefixed as this will
227
    be handled in the hooks runner. Also note additional keys will be
228
    added by the hooks runner. If the LU doesn't define any
229
    environment, an empty dict (and not None) should be returned.
230

231
    No nodes should be returned as an empty list (and not None).
232

233
    Note that if the HPATH for a LU class is None, this function will
234
    not be called.
235

236
    """
237
    raise NotImplementedError
238

    
239
  def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240
    """Notify the LU about the results of its hooks.
241

242
    This method is called every time a hooks phase is executed, and notifies
243
    the Logical Unit about the hooks' result. The LU can then use it to alter
244
    its result based on the hooks.  By default the method does nothing and the
245
    previous result is passed back unchanged but any LU can define it if it
246
    wants to use the local cluster hook-scripts somehow.
247

248
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
249
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250
    @param hook_results: the results of the multi-node hooks rpc call
251
    @param feedback_fn: function used send feedback back to the caller
252
    @param lu_result: the previous Exec result this LU had, or None
253
        in the PRE phase
254
    @return: the new Exec result, based on the previous result
255
        and hook results
256

257
    """
258
    return lu_result
259

    
260
  def _ExpandAndLockInstance(self):
261
    """Helper function to expand and lock an instance.
262

263
    Many LUs that work on an instance take its name in self.op.instance_name
264
    and need to expand it and then declare the expanded name for locking. This
265
    function does it, and then updates self.op.instance_name to the expanded
266
    name. It also initializes needed_locks as a dict, if this hasn't been done
267
    before.
268

269
    """
270
    if self.needed_locks is None:
271
      self.needed_locks = {}
272
    else:
273
      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274
        "_ExpandAndLockInstance called with instance-level locks set"
275
    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276
    if expanded_name is None:
277
      raise errors.OpPrereqError("Instance '%s' not known" %
278
                                  self.op.instance_name)
279
    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280
    self.op.instance_name = expanded_name
281

    
282
  def _LockInstancesNodes(self, primary_only=False):
283
    """Helper function to declare instances' nodes for locking.
284

285
    This function should be called after locking one or more instances to lock
286
    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287
    with all primary or secondary nodes for instances already locked and
288
    present in self.needed_locks[locking.LEVEL_INSTANCE].
289

290
    It should be called from DeclareLocks, and for safety only works if
291
    self.recalculate_locks[locking.LEVEL_NODE] is set.
292

293
    In the future it may grow parameters to just lock some instance's nodes, or
294
    to just lock primaries or secondary nodes, if needed.
295

296
    If should be called in DeclareLocks in a way similar to::
297

298
      if level == locking.LEVEL_NODE:
299
        self._LockInstancesNodes()
300

301
    @type primary_only: boolean
302
    @param primary_only: only lock primary nodes of locked instances
303

304
    """
305
    assert locking.LEVEL_NODE in self.recalculate_locks, \
306
      "_LockInstancesNodes helper function called with no nodes to recalculate"
307

    
308
    # TODO: check if we're really been called with the instance locks held
309

    
310
    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311
    # future we might want to have different behaviors depending on the value
312
    # of self.recalculate_locks[locking.LEVEL_NODE]
313
    wanted_nodes = []
314
    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315
      instance = self.context.cfg.GetInstanceInfo(instance_name)
316
      wanted_nodes.append(instance.primary_node)
317
      if not primary_only:
318
        wanted_nodes.extend(instance.secondary_nodes)
319

    
320
    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321
      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322
    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323
      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324

    
325
    del self.recalculate_locks[locking.LEVEL_NODE]
326

    
327

    
328
class NoHooksLU(LogicalUnit):
329
  """Simple LU which runs no hooks.
330

331
  This LU is intended as a parent for other LogicalUnits which will
332
  run no hooks, in order to reduce duplicate code.
333

334
  """
335
  HPATH = None
336
  HTYPE = None
337

    
338

    
339
def _GetWantedNodes(lu, nodes):
340
  """Returns list of checked and expanded node names.
341

342
  @type lu: L{LogicalUnit}
343
  @param lu: the logical unit on whose behalf we execute
344
  @type nodes: list
345
  @param nodes: list of node names or None for all nodes
346
  @rtype: list
347
  @return: the list of nodes, sorted
348
  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349

350
  """
351
  if not isinstance(nodes, list):
352
    raise errors.OpPrereqError("Invalid argument type 'nodes'")
353

    
354
  if not nodes:
355
    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356
      " non-empty list of nodes whose name is to be expanded.")
357

    
358
  wanted = []
359
  for name in nodes:
360
    node = lu.cfg.ExpandNodeName(name)
361
    if node is None:
362
      raise errors.OpPrereqError("No such node name '%s'" % name)
363
    wanted.append(node)
364

    
365
  return utils.NiceSort(wanted)
366

    
367

    
368
def _GetWantedInstances(lu, instances):
369
  """Returns list of checked and expanded instance names.
370

371
  @type lu: L{LogicalUnit}
372
  @param lu: the logical unit on whose behalf we execute
373
  @type instances: list
374
  @param instances: list of instance names or None for all instances
375
  @rtype: list
376
  @return: the list of instances, sorted
377
  @raise errors.OpPrereqError: if the instances parameter is wrong type
378
  @raise errors.OpPrereqError: if any of the passed instances is not found
379

380
  """
381
  if not isinstance(instances, list):
382
    raise errors.OpPrereqError("Invalid argument type 'instances'")
383

    
384
  if instances:
385
    wanted = []
386

    
387
    for name in instances:
388
      instance = lu.cfg.ExpandInstanceName(name)
389
      if instance is None:
390
        raise errors.OpPrereqError("No such instance name '%s'" % name)
391
      wanted.append(instance)
392

    
393
  else:
394
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395
  return wanted
396

    
397

    
398
def _CheckOutputFields(static, dynamic, selected):
399
  """Checks whether all selected fields are valid.
400

401
  @type static: L{utils.FieldSet}
402
  @param static: static fields set
403
  @type dynamic: L{utils.FieldSet}
404
  @param dynamic: dynamic fields set
405

406
  """
407
  f = utils.FieldSet()
408
  f.Extend(static)
409
  f.Extend(dynamic)
410

    
411
  delta = f.NonMatching(selected)
412
  if delta:
413
    raise errors.OpPrereqError("Unknown output fields selected: %s"
414
                               % ",".join(delta))
415

    
416

    
417
def _CheckBooleanOpField(op, name):
418
  """Validates boolean opcode parameters.
419

420
  This will ensure that an opcode parameter is either a boolean value,
421
  or None (but that it always exists).
422

423
  """
424
  val = getattr(op, name, None)
425
  if not (val is None or isinstance(val, bool)):
426
    raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427
                               (name, str(val)))
428
  setattr(op, name, val)
429

    
430

    
431
def _CheckNodeOnline(lu, node):
432
  """Ensure that a given node is online.
433

434
  @param lu: the LU on behalf of which we make the check
435
  @param node: the node to check
436
  @raise errors.OpPrereqError: if the node is offline
437

438
  """
439
  if lu.cfg.GetNodeInfo(node).offline:
440
    raise errors.OpPrereqError("Can't use offline node %s" % node)
441

    
442

    
443
def _CheckNodeNotDrained(lu, node):
444
  """Ensure that a given node is not drained.
445

446
  @param lu: the LU on behalf of which we make the check
447
  @param node: the node to check
448
  @raise errors.OpPrereqError: if the node is drained
449

450
  """
451
  if lu.cfg.GetNodeInfo(node).drained:
452
    raise errors.OpPrereqError("Can't use drained node %s" % node)
453

    
454

    
455
def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456
                          memory, vcpus, nics, disk_template, disks):
457
  """Builds instance related env variables for hooks
458

459
  This builds the hook environment from individual variables.
460

461
  @type name: string
462
  @param name: the name of the instance
463
  @type primary_node: string
464
  @param primary_node: the name of the instance's primary node
465
  @type secondary_nodes: list
466
  @param secondary_nodes: list of secondary nodes as strings
467
  @type os_type: string
468
  @param os_type: the name of the instance's OS
469
  @type status: boolean
470
  @param status: the should_run status of the instance
471
  @type memory: string
472
  @param memory: the memory size of the instance
473
  @type vcpus: string
474
  @param vcpus: the count of VCPUs the instance has
475
  @type nics: list
476
  @param nics: list of tuples (ip, bridge, mac) representing
477
      the NICs the instance  has
478
  @type disk_template: string
479
  @param disk_template: the distk template of the instance
480
  @type disks: list
481
  @param disks: the list of (size, mode) pairs
482
  @rtype: dict
483
  @return: the hook environment for this instance
484

485
  """
486
  if status:
487
    str_status = "up"
488
  else:
489
    str_status = "down"
490
  env = {
491
    "OP_TARGET": name,
492
    "INSTANCE_NAME": name,
493
    "INSTANCE_PRIMARY": primary_node,
494
    "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495
    "INSTANCE_OS_TYPE": os_type,
496
    "INSTANCE_STATUS": str_status,
497
    "INSTANCE_MEMORY": memory,
498
    "INSTANCE_VCPUS": vcpus,
499
    "INSTANCE_DISK_TEMPLATE": disk_template,
500
  }
501

    
502
  if nics:
503
    nic_count = len(nics)
504
    for idx, (ip, bridge, mac) in enumerate(nics):
505
      if ip is None:
506
        ip = ""
507
      env["INSTANCE_NIC%d_IP" % idx] = ip
508
      env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509
      env["INSTANCE_NIC%d_MAC" % idx] = mac
510
  else:
511
    nic_count = 0
512

    
513
  env["INSTANCE_NIC_COUNT"] = nic_count
514

    
515
  if disks:
516
    disk_count = len(disks)
517
    for idx, (size, mode) in enumerate(disks):
518
      env["INSTANCE_DISK%d_SIZE" % idx] = size
519
      env["INSTANCE_DISK%d_MODE" % idx] = mode
520
  else:
521
    disk_count = 0
522

    
523
  env["INSTANCE_DISK_COUNT"] = disk_count
524

    
525
  return env
526

    
527

    
528
def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529
  """Builds instance related env variables for hooks from an object.
530

531
  @type lu: L{LogicalUnit}
532
  @param lu: the logical unit on whose behalf we execute
533
  @type instance: L{objects.Instance}
534
  @param instance: the instance for which we should build the
535
      environment
536
  @type override: dict
537
  @param override: dictionary with key/values that will override
538
      our values
539
  @rtype: dict
540
  @return: the hook environment dictionary
541

542
  """
543
  bep = lu.cfg.GetClusterInfo().FillBE(instance)
544
  args = {
545
    'name': instance.name,
546
    'primary_node': instance.primary_node,
547
    'secondary_nodes': instance.secondary_nodes,
548
    'os_type': instance.os,
549
    'status': instance.admin_up,
550
    'memory': bep[constants.BE_MEMORY],
551
    'vcpus': bep[constants.BE_VCPUS],
552
    'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553
    'disk_template': instance.disk_template,
554
    'disks': [(disk.size, disk.mode) for disk in instance.disks],
555
  }
556
  if override:
557
    args.update(override)
558
  return _BuildInstanceHookEnv(**args)
559

    
560

    
561
def _AdjustCandidatePool(lu):
562
  """Adjust the candidate pool after node operations.
563

564
  """
565
  mod_list = lu.cfg.MaintainCandidatePool()
566
  if mod_list:
567
    lu.LogInfo("Promoted nodes to master candidate role: %s",
568
               ", ".join(node.name for node in mod_list))
569
    for name in mod_list:
570
      lu.context.ReaddNode(name)
571
  mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572
  if mc_now > mc_max:
573
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574
               (mc_now, mc_max))
575

    
576

    
577
def _CheckNicsBridgesExist(lu, target_nics, target_node,
578
                               profile=constants.PP_DEFAULT):
579
  """Check that the brigdes needed by a list of nics exist.
580

581
  """
582
  c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
583
  paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
584
                for nic in target_nics]
585
  brlist = [params[constants.NIC_LINK] for params in paramslist
586
            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
587
  if brlist:
588
    result = lu.rpc.call_bridges_exist(target_node, brlist)
589
    result.Raise()
590
    if not result.data:
591
      raise errors.OpPrereqError("One or more target bridges %s does not"
592
                                 " exist on destination node '%s'" %
593
                                 (brlist, target_node))
594

    
595

    
596
def _CheckInstanceBridgesExist(lu, instance, node=None):
597
  """Check that the brigdes needed by an instance exist.
598

599
  """
600
  if node is None:
601
    node=instance.primary_node
602
  _CheckNicsBridgesExist(lu, instance.nics, node)
603

    
604

    
605
class LUDestroyCluster(NoHooksLU):
606
  """Logical unit for destroying the cluster.
607

608
  """
609
  _OP_REQP = []
610

    
611
  def CheckPrereq(self):
612
    """Check prerequisites.
613

614
    This checks whether the cluster is empty.
615

616
    Any errors are signalled by raising errors.OpPrereqError.
617

618
    """
619
    master = self.cfg.GetMasterNode()
620

    
621
    nodelist = self.cfg.GetNodeList()
622
    if len(nodelist) != 1 or nodelist[0] != master:
623
      raise errors.OpPrereqError("There are still %d node(s) in"
624
                                 " this cluster." % (len(nodelist) - 1))
625
    instancelist = self.cfg.GetInstanceList()
626
    if instancelist:
627
      raise errors.OpPrereqError("There are still %d instance(s) in"
628
                                 " this cluster." % len(instancelist))
629

    
630
  def Exec(self, feedback_fn):
631
    """Destroys the cluster.
632

633
    """
634
    master = self.cfg.GetMasterNode()
635
    result = self.rpc.call_node_stop_master(master, False)
636
    result.Raise()
637
    if not result.data:
638
      raise errors.OpExecError("Could not disable the master role")
639
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640
    utils.CreateBackup(priv_key)
641
    utils.CreateBackup(pub_key)
642
    return master
643

    
644

    
645
class LUVerifyCluster(LogicalUnit):
646
  """Verifies the cluster status.
647

648
  """
649
  HPATH = "cluster-verify"
650
  HTYPE = constants.HTYPE_CLUSTER
651
  _OP_REQP = ["skip_checks"]
652
  REQ_BGL = False
653

    
654
  def ExpandNames(self):
655
    self.needed_locks = {
656
      locking.LEVEL_NODE: locking.ALL_SET,
657
      locking.LEVEL_INSTANCE: locking.ALL_SET,
658
    }
659
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660

    
661
  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662
                  node_result, feedback_fn, master_files,
663
                  drbd_map, vg_name):
664
    """Run multiple tests against a node.
665

666
    Test list:
667

668
      - compares ganeti version
669
      - checks vg existance and size > 20G
670
      - checks config file checksum
671
      - checks ssh to other nodes
672

673
    @type nodeinfo: L{objects.Node}
674
    @param nodeinfo: the node to check
675
    @param file_list: required list of files
676
    @param local_cksum: dictionary of local files and their checksums
677
    @param node_result: the results from the node
678
    @param feedback_fn: function used to accumulate results
679
    @param master_files: list of files that only masters should have
680
    @param drbd_map: the useddrbd minors for this node, in
681
        form of minor: (instance, must_exist) which correspond to instances
682
        and their running status
683
    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684

685
    """
686
    node = nodeinfo.name
687

    
688
    # main result, node_result should be a non-empty dict
689
    if not node_result or not isinstance(node_result, dict):
690
      feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691
      return True
692

    
693
    # compares ganeti version
694
    local_version = constants.PROTOCOL_VERSION
695
    remote_version = node_result.get('version', None)
696
    if not (remote_version and isinstance(remote_version, (list, tuple)) and
697
            len(remote_version) == 2):
698
      feedback_fn("  - ERROR: connection to %s failed" % (node))
699
      return True
700

    
701
    if local_version != remote_version[0]:
702
      feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703
                  " node %s %s" % (local_version, node, remote_version[0]))
704
      return True
705

    
706
    # node seems compatible, we can actually try to look into its results
707

    
708
    bad = False
709

    
710
    # full package version
711
    if constants.RELEASE_VERSION != remote_version[1]:
712
      feedback_fn("  - WARNING: software version mismatch: master %s,"
713
                  " node %s %s" %
714
                  (constants.RELEASE_VERSION, node, remote_version[1]))
715

    
716
    # checks vg existence and size > 20G
717
    if vg_name is not None:
718
      vglist = node_result.get(constants.NV_VGLIST, None)
719
      if not vglist:
720
        feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721
                        (node,))
722
        bad = True
723
      else:
724
        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725
                                              constants.MIN_VG_SIZE)
726
        if vgstatus:
727
          feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728
          bad = True
729

    
730
    # checks config file checksum
731

    
732
    remote_cksum = node_result.get(constants.NV_FILELIST, None)
733
    if not isinstance(remote_cksum, dict):
734
      bad = True
735
      feedback_fn("  - ERROR: node hasn't returned file checksum data")
736
    else:
737
      for file_name in file_list:
738
        node_is_mc = nodeinfo.master_candidate
739
        must_have_file = file_name not in master_files
740
        if file_name not in remote_cksum:
741
          if node_is_mc or must_have_file:
742
            bad = True
743
            feedback_fn("  - ERROR: file '%s' missing" % file_name)
744
        elif remote_cksum[file_name] != local_cksum[file_name]:
745
          if node_is_mc or must_have_file:
746
            bad = True
747
            feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748
          else:
749
            # not candidate and this is not a must-have file
750
            bad = True
751
            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
752
                        " '%s'" % file_name)
753
        else:
754
          # all good, except non-master/non-must have combination
755
          if not node_is_mc and not must_have_file:
756
            feedback_fn("  - ERROR: file '%s' should not exist on non master"
757
                        " candidates" % file_name)
758

    
759
    # checks ssh to any
760

    
761
    if constants.NV_NODELIST not in node_result:
762
      bad = True
763
      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764
    else:
765
      if node_result[constants.NV_NODELIST]:
766
        bad = True
767
        for node in node_result[constants.NV_NODELIST]:
768
          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769
                          (node, node_result[constants.NV_NODELIST][node]))
770

    
771
    if constants.NV_NODENETTEST not in node_result:
772
      bad = True
773
      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774
    else:
775
      if node_result[constants.NV_NODENETTEST]:
776
        bad = True
777
        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778
        for node in nlist:
779
          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780
                          (node, node_result[constants.NV_NODENETTEST][node]))
781

    
782
    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783
    if isinstance(hyp_result, dict):
784
      for hv_name, hv_result in hyp_result.iteritems():
785
        if hv_result is not None:
786
          feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787
                      (hv_name, hv_result))
788

    
789
    # check used drbd list
790
    if vg_name is not None:
791
      used_minors = node_result.get(constants.NV_DRBDLIST, [])
792
      if not isinstance(used_minors, (tuple, list)):
793
        feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794
                    str(used_minors))
795
      else:
796
        for minor, (iname, must_exist) in drbd_map.items():
797
          if minor not in used_minors and must_exist:
798
            feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799
                        " not active" % (minor, iname))
800
            bad = True
801
        for minor in used_minors:
802
          if minor not in drbd_map:
803
            feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804
                        minor)
805
            bad = True
806

    
807
    return bad
808

    
809
  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810
                      node_instance, feedback_fn, n_offline):
811
    """Verify an instance.
812

813
    This function checks to see if the required block devices are
814
    available on the instance's node.
815

816
    """
817
    bad = False
818

    
819
    node_current = instanceconfig.primary_node
820

    
821
    node_vol_should = {}
822
    instanceconfig.MapLVsByNode(node_vol_should)
823

    
824
    for node in node_vol_should:
825
      if node in n_offline:
826
        # ignore missing volumes on offline nodes
827
        continue
828
      for volume in node_vol_should[node]:
829
        if node not in node_vol_is or volume not in node_vol_is[node]:
830
          feedback_fn("  - ERROR: volume %s missing on node %s" %
831
                          (volume, node))
832
          bad = True
833

    
834
    if instanceconfig.admin_up:
835
      if ((node_current not in node_instance or
836
          not instance in node_instance[node_current]) and
837
          node_current not in n_offline):
838
        feedback_fn("  - ERROR: instance %s not running on node %s" %
839
                        (instance, node_current))
840
        bad = True
841

    
842
    for node in node_instance:
843
      if (not node == node_current):
844
        if instance in node_instance[node]:
845
          feedback_fn("  - ERROR: instance %s should not run on node %s" %
846
                          (instance, node))
847
          bad = True
848

    
849
    return bad
850

    
851
  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852
    """Verify if there are any unknown volumes in the cluster.
853

854
    The .os, .swap and backup volumes are ignored. All other volumes are
855
    reported as unknown.
856

857
    """
858
    bad = False
859

    
860
    for node in node_vol_is:
861
      for volume in node_vol_is[node]:
862
        if node not in node_vol_should or volume not in node_vol_should[node]:
863
          feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864
                      (volume, node))
865
          bad = True
866
    return bad
867

    
868
  def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869
    """Verify the list of running instances.
870

871
    This checks what instances are running but unknown to the cluster.
872

873
    """
874
    bad = False
875
    for node in node_instance:
876
      for runninginstance in node_instance[node]:
877
        if runninginstance not in instancelist:
878
          feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879
                          (runninginstance, node))
880
          bad = True
881
    return bad
882

    
883
  def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884
    """Verify N+1 Memory Resilience.
885

886
    Check that if one single node dies we can still start all the instances it
887
    was primary for.
888

889
    """
890
    bad = False
891

    
892
    for node, nodeinfo in node_info.iteritems():
893
      # This code checks that every node which is now listed as secondary has
894
      # enough memory to host all instances it is supposed to should a single
895
      # other node in the cluster fail.
896
      # FIXME: not ready for failover to an arbitrary node
897
      # FIXME: does not support file-backed instances
898
      # WARNING: we currently take into account down instances as well as up
899
      # ones, considering that even if they're down someone might want to start
900
      # them even in the event of a node failure.
901
      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902
        needed_mem = 0
903
        for instance in instances:
904
          bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905
          if bep[constants.BE_AUTO_BALANCE]:
906
            needed_mem += bep[constants.BE_MEMORY]
907
        if nodeinfo['mfree'] < needed_mem:
908
          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
909
                      " failovers should node %s fail" % (node, prinode))
910
          bad = True
911
    return bad
912

    
913
  def CheckPrereq(self):
914
    """Check prerequisites.
915

916
    Transform the list of checks we're going to skip into a set and check that
917
    all its members are valid.
918

919
    """
920
    self.skip_set = frozenset(self.op.skip_checks)
921
    if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922
      raise errors.OpPrereqError("Invalid checks to be skipped specified")
923

    
924
  def BuildHooksEnv(self):
925
    """Build hooks env.
926

927
    Cluster-Verify hooks just rone in the post phase and their failure makes
928
    the output be logged in the verify output and the verification to fail.
929

930
    """
931
    all_nodes = self.cfg.GetNodeList()
932
    env = {
933
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934
      }
935
    for node in self.cfg.GetAllNodesInfo().values():
936
      env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937

    
938
    return env, [], all_nodes
939

    
940
  def Exec(self, feedback_fn):
941
    """Verify integrity of cluster, performing various test on nodes.
942

943
    """
944
    bad = False
945
    feedback_fn("* Verifying global settings")
946
    for msg in self.cfg.VerifyConfig():
947
      feedback_fn("  - ERROR: %s" % msg)
948

    
949
    vg_name = self.cfg.GetVGName()
950
    hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951
    nodelist = utils.NiceSort(self.cfg.GetNodeList())
952
    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953
    instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954
    instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955
                        for iname in instancelist)
956
    i_non_redundant = [] # Non redundant instances
957
    i_non_a_balanced = [] # Non auto-balanced instances
958
    n_offline = [] # List of offline nodes
959
    n_drained = [] # List of nodes being drained
960
    node_volume = {}
961
    node_instance = {}
962
    node_info = {}
963
    instance_cfg = {}
964

    
965
    # FIXME: verify OS list
966
    # do local checksums
967
    master_files = [constants.CLUSTER_CONF_FILE]
968

    
969
    file_names = ssconf.SimpleStore().GetFileList()
970
    file_names.append(constants.SSL_CERT_FILE)
971
    file_names.append(constants.RAPI_CERT_FILE)
972
    file_names.extend(master_files)
973

    
974
    local_checksums = utils.FingerprintFiles(file_names)
975

    
976
    feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977
    node_verify_param = {
978
      constants.NV_FILELIST: file_names,
979
      constants.NV_NODELIST: [node.name for node in nodeinfo
980
                              if not node.offline],
981
      constants.NV_HYPERVISOR: hypervisors,
982
      constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983
                                  node.secondary_ip) for node in nodeinfo
984
                                 if not node.offline],
985
      constants.NV_INSTANCELIST: hypervisors,
986
      constants.NV_VERSION: None,
987
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988
      }
989
    if vg_name is not None:
990
      node_verify_param[constants.NV_VGLIST] = None
991
      node_verify_param[constants.NV_LVLIST] = vg_name
992
      node_verify_param[constants.NV_DRBDLIST] = None
993
    all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994
                                           self.cfg.GetClusterName())
995

    
996
    cluster = self.cfg.GetClusterInfo()
997
    master_node = self.cfg.GetMasterNode()
998
    all_drbd_map = self.cfg.ComputeDRBDMap()
999

    
1000
    for node_i in nodeinfo:
1001
      node = node_i.name
1002
      nresult = all_nvinfo[node].data
1003

    
1004
      if node_i.offline:
1005
        feedback_fn("* Skipping offline node %s" % (node,))
1006
        n_offline.append(node)
1007
        continue
1008

    
1009
      if node == master_node:
1010
        ntype = "master"
1011
      elif node_i.master_candidate:
1012
        ntype = "master candidate"
1013
      elif node_i.drained:
1014
        ntype = "drained"
1015
        n_drained.append(node)
1016
      else:
1017
        ntype = "regular"
1018
      feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019

    
1020
      if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021
        feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022
        bad = True
1023
        continue
1024

    
1025
      node_drbd = {}
1026
      for minor, instance in all_drbd_map[node].items():
1027
        if instance not in instanceinfo:
1028
          feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029
                      instance)
1030
          # ghost instance should not be running, but otherwise we
1031
          # don't give double warnings (both ghost instance and
1032
          # unallocated minor in use)
1033
          node_drbd[minor] = (instance, False)
1034
        else:
1035
          instance = instanceinfo[instance]
1036
          node_drbd[minor] = (instance.name, instance.admin_up)
1037
      result = self._VerifyNode(node_i, file_names, local_checksums,
1038
                                nresult, feedback_fn, master_files,
1039
                                node_drbd, vg_name)
1040
      bad = bad or result
1041

    
1042
      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043
      if vg_name is None:
1044
        node_volume[node] = {}
1045
      elif isinstance(lvdata, basestring):
1046
        feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047
                    (node, utils.SafeEncode(lvdata)))
1048
        bad = True
1049
        node_volume[node] = {}
1050
      elif not isinstance(lvdata, dict):
1051
        feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052
        bad = True
1053
        continue
1054
      else:
1055
        node_volume[node] = lvdata
1056

    
1057
      # node_instance
1058
      idata = nresult.get(constants.NV_INSTANCELIST, None)
1059
      if not isinstance(idata, list):
1060
        feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061
                    (node,))
1062
        bad = True
1063
        continue
1064

    
1065
      node_instance[node] = idata
1066

    
1067
      # node_info
1068
      nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069
      if not isinstance(nodeinfo, dict):
1070
        feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071
        bad = True
1072
        continue
1073

    
1074
      try:
1075
        node_info[node] = {
1076
          "mfree": int(nodeinfo['memory_free']),
1077
          "pinst": [],
1078
          "sinst": [],
1079
          # dictionary holding all instances this node is secondary for,
1080
          # grouped by their primary node. Each key is a cluster node, and each
1081
          # value is a list of instances which have the key as primary and the
1082
          # current node as secondary.  this is handy to calculate N+1 memory
1083
          # availability if you can only failover from a primary to its
1084
          # secondary.
1085
          "sinst-by-pnode": {},
1086
        }
1087
        # FIXME: devise a free space model for file based instances as well
1088
        if vg_name is not None:
1089
          if (constants.NV_VGLIST not in nresult or
1090
              vg_name not in nresult[constants.NV_VGLIST]):
1091
            feedback_fn("  - ERROR: node %s didn't return data for the"
1092
                        " volume group '%s' - it is either missing or broken" %
1093
                        (node, vg_name))
1094
            bad = True
1095
            continue
1096
          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097
      except (ValueError, KeyError):
1098
        feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099
                    " from node %s" % (node,))
1100
        bad = True
1101
        continue
1102

    
1103
    node_vol_should = {}
1104

    
1105
    for instance in instancelist:
1106
      feedback_fn("* Verifying instance %s" % instance)
1107
      inst_config = instanceinfo[instance]
1108
      result =  self._VerifyInstance(instance, inst_config, node_volume,
1109
                                     node_instance, feedback_fn, n_offline)
1110
      bad = bad or result
1111
      inst_nodes_offline = []
1112

    
1113
      inst_config.MapLVsByNode(node_vol_should)
1114

    
1115
      instance_cfg[instance] = inst_config
1116

    
1117
      pnode = inst_config.primary_node
1118
      if pnode in node_info:
1119
        node_info[pnode]['pinst'].append(instance)
1120
      elif pnode not in n_offline:
1121
        feedback_fn("  - ERROR: instance %s, connection to primary node"
1122
                    " %s failed" % (instance, pnode))
1123
        bad = True
1124

    
1125
      if pnode in n_offline:
1126
        inst_nodes_offline.append(pnode)
1127

    
1128
      # If the instance is non-redundant we cannot survive losing its primary
1129
      # node, so we are not N+1 compliant. On the other hand we have no disk
1130
      # templates with more than one secondary so that situation is not well
1131
      # supported either.
1132
      # FIXME: does not support file-backed instances
1133
      if len(inst_config.secondary_nodes) == 0:
1134
        i_non_redundant.append(instance)
1135
      elif len(inst_config.secondary_nodes) > 1:
1136
        feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137
                    % instance)
1138

    
1139
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140
        i_non_a_balanced.append(instance)
1141

    
1142
      for snode in inst_config.secondary_nodes:
1143
        if snode in node_info:
1144
          node_info[snode]['sinst'].append(instance)
1145
          if pnode not in node_info[snode]['sinst-by-pnode']:
1146
            node_info[snode]['sinst-by-pnode'][pnode] = []
1147
          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148
        elif snode not in n_offline:
1149
          feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150
                      " %s failed" % (instance, snode))
1151
          bad = True
1152
        if snode in n_offline:
1153
          inst_nodes_offline.append(snode)
1154

    
1155
      if inst_nodes_offline:
1156
        # warn that the instance lives on offline nodes, and set bad=True
1157
        feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158
                    ", ".join(inst_nodes_offline))
1159
        bad = True
1160

    
1161
    feedback_fn("* Verifying orphan volumes")
1162
    result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163
                                       feedback_fn)
1164
    bad = bad or result
1165

    
1166
    feedback_fn("* Verifying remaining instances")
1167
    result = self._VerifyOrphanInstances(instancelist, node_instance,
1168
                                         feedback_fn)
1169
    bad = bad or result
1170

    
1171
    if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172
      feedback_fn("* Verifying N+1 Memory redundancy")
1173
      result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174
      bad = bad or result
1175

    
1176
    feedback_fn("* Other Notes")
1177
    if i_non_redundant:
1178
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179
                  % len(i_non_redundant))
1180

    
1181
    if i_non_a_balanced:
1182
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183
                  % len(i_non_a_balanced))
1184

    
1185
    if n_offline:
1186
      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187

    
1188
    if n_drained:
1189
      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190

    
1191
    return not bad
1192

    
1193
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194
    """Analize the post-hooks' result
1195

1196
    This method analyses the hook result, handles it, and sends some
1197
    nicely-formatted feedback back to the user.
1198

1199
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201
    @param hooks_results: the results of the multi-node hooks rpc call
1202
    @param feedback_fn: function used send feedback back to the caller
1203
    @param lu_result: previous Exec result
1204
    @return: the new Exec result, based on the previous result
1205
        and hook results
1206

1207
    """
1208
    # We only really run POST phase hooks, and are only interested in
1209
    # their results
1210
    if phase == constants.HOOKS_PHASE_POST:
1211
      # Used to change hooks' output to proper indentation
1212
      indent_re = re.compile('^', re.M)
1213
      feedback_fn("* Hooks Results")
1214
      if not hooks_results:
1215
        feedback_fn("  - ERROR: general communication failure")
1216
        lu_result = 1
1217
      else:
1218
        for node_name in hooks_results:
1219
          show_node_header = True
1220
          res = hooks_results[node_name]
1221
          if res.failed or res.data is False or not isinstance(res.data, list):
1222
            if res.offline:
1223
              # no need to warn or set fail return value
1224
              continue
1225
            feedback_fn("    Communication failure in hooks execution")
1226
            lu_result = 1
1227
            continue
1228
          for script, hkr, output in res.data:
1229
            if hkr == constants.HKR_FAIL:
1230
              # The node header is only shown once, if there are
1231
              # failing hooks on that node
1232
              if show_node_header:
1233
                feedback_fn("  Node %s:" % node_name)
1234
                show_node_header = False
1235
              feedback_fn("    ERROR: Script %s failed, output:" % script)
1236
              output = indent_re.sub('      ', output)
1237
              feedback_fn("%s" % output)
1238
              lu_result = 1
1239

    
1240
      return lu_result
1241

    
1242

    
1243
class LUVerifyDisks(NoHooksLU):
1244
  """Verifies the cluster disks status.
1245

1246
  """
1247
  _OP_REQP = []
1248
  REQ_BGL = False
1249

    
1250
  def ExpandNames(self):
1251
    self.needed_locks = {
1252
      locking.LEVEL_NODE: locking.ALL_SET,
1253
      locking.LEVEL_INSTANCE: locking.ALL_SET,
1254
    }
1255
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256

    
1257
  def CheckPrereq(self):
1258
    """Check prerequisites.
1259

1260
    This has no prerequisites.
1261

1262
    """
1263
    pass
1264

    
1265
  def Exec(self, feedback_fn):
1266
    """Verify integrity of cluster disks.
1267

1268
    """
1269
    result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270

    
1271
    vg_name = self.cfg.GetVGName()
1272
    nodes = utils.NiceSort(self.cfg.GetNodeList())
1273
    instances = [self.cfg.GetInstanceInfo(name)
1274
                 for name in self.cfg.GetInstanceList()]
1275

    
1276
    nv_dict = {}
1277
    for inst in instances:
1278
      inst_lvs = {}
1279
      if (not inst.admin_up or
1280
          inst.disk_template not in constants.DTS_NET_MIRROR):
1281
        continue
1282
      inst.MapLVsByNode(inst_lvs)
1283
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284
      for node, vol_list in inst_lvs.iteritems():
1285
        for vol in vol_list:
1286
          nv_dict[(node, vol)] = inst
1287

    
1288
    if not nv_dict:
1289
      return result
1290

    
1291
    node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292

    
1293
    to_act = set()
1294
    for node in nodes:
1295
      # node_volume
1296
      lvs = node_lvs[node]
1297
      if lvs.failed:
1298
        if not lvs.offline:
1299
          self.LogWarning("Connection to node %s failed: %s" %
1300
                          (node, lvs.data))
1301
        continue
1302
      lvs = lvs.data
1303
      if isinstance(lvs, basestring):
1304
        logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1305
        res_nlvm[node] = lvs
1306
        continue
1307
      elif not isinstance(lvs, dict):
1308
        logging.warning("Connection to node %s failed or invalid data"
1309
                        " returned", node)
1310
        res_nodes.append(node)
1311
        continue
1312

    
1313
      for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1314
        inst = nv_dict.pop((node, lv_name), None)
1315
        if (not lv_online and inst is not None
1316
            and inst.name not in res_instances):
1317
          res_instances.append(inst.name)
1318

    
1319
    # any leftover items in nv_dict are missing LVs, let's arrange the
1320
    # data better
1321
    for key, inst in nv_dict.iteritems():
1322
      if inst.name not in res_missing:
1323
        res_missing[inst.name] = []
1324
      res_missing[inst.name].append(key)
1325

    
1326
    return result
1327

    
1328

    
1329
class LURenameCluster(LogicalUnit):
1330
  """Rename the cluster.
1331

1332
  """
1333
  HPATH = "cluster-rename"
1334
  HTYPE = constants.HTYPE_CLUSTER
1335
  _OP_REQP = ["name"]
1336

    
1337
  def BuildHooksEnv(self):
1338
    """Build hooks env.
1339

1340
    """
1341
    env = {
1342
      "OP_TARGET": self.cfg.GetClusterName(),
1343
      "NEW_NAME": self.op.name,
1344
      }
1345
    mn = self.cfg.GetMasterNode()
1346
    return env, [mn], [mn]
1347

    
1348
  def CheckPrereq(self):
1349
    """Verify that the passed name is a valid one.
1350

1351
    """
1352
    hostname = utils.HostInfo(self.op.name)
1353

    
1354
    new_name = hostname.name
1355
    self.ip = new_ip = hostname.ip
1356
    old_name = self.cfg.GetClusterName()
1357
    old_ip = self.cfg.GetMasterIP()
1358
    if new_name == old_name and new_ip == old_ip:
1359
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
1360
                                 " cluster has changed")
1361
    if new_ip != old_ip:
1362
      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1363
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
1364
                                   " reachable on the network. Aborting." %
1365
                                   new_ip)
1366

    
1367
    self.op.name = new_name
1368

    
1369
  def Exec(self, feedback_fn):
1370
    """Rename the cluster.
1371

1372
    """
1373
    clustername = self.op.name
1374
    ip = self.ip
1375

    
1376
    # shutdown the master IP
1377
    master = self.cfg.GetMasterNode()
1378
    result = self.rpc.call_node_stop_master(master, False)
1379
    if result.failed or not result.data:
1380
      raise errors.OpExecError("Could not disable the master role")
1381

    
1382
    try:
1383
      cluster = self.cfg.GetClusterInfo()
1384
      cluster.cluster_name = clustername
1385
      cluster.master_ip = ip
1386
      self.cfg.Update(cluster)
1387

    
1388
      # update the known hosts file
1389
      ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1390
      node_list = self.cfg.GetNodeList()
1391
      try:
1392
        node_list.remove(master)
1393
      except ValueError:
1394
        pass
1395
      result = self.rpc.call_upload_file(node_list,
1396
                                         constants.SSH_KNOWN_HOSTS_FILE)
1397
      for to_node, to_result in result.iteritems():
1398
         msg = to_result.RemoteFailMsg()
1399
         if msg:
1400
           msg = ("Copy of file %s to node %s failed: %s" %
1401
                   (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1402
           self.proc.LogWarning(msg)
1403

    
1404
    finally:
1405
      result = self.rpc.call_node_start_master(master, False)
1406
      if result.failed or not result.data:
1407
        self.LogWarning("Could not re-enable the master role on"
1408
                        " the master, please restart manually.")
1409

    
1410

    
1411
def _RecursiveCheckIfLVMBased(disk):
1412
  """Check if the given disk or its children are lvm-based.
1413

1414
  @type disk: L{objects.Disk}
1415
  @param disk: the disk to check
1416
  @rtype: booleean
1417
  @return: boolean indicating whether a LD_LV dev_type was found or not
1418

1419
  """
1420
  if disk.children:
1421
    for chdisk in disk.children:
1422
      if _RecursiveCheckIfLVMBased(chdisk):
1423
        return True
1424
  return disk.dev_type == constants.LD_LV
1425

    
1426

    
1427
class LUSetClusterParams(LogicalUnit):
1428
  """Change the parameters of the cluster.
1429

1430
  """
1431
  HPATH = "cluster-modify"
1432
  HTYPE = constants.HTYPE_CLUSTER
1433
  _OP_REQP = []
1434
  REQ_BGL = False
1435

    
1436
  def CheckArguments(self):
1437
    """Check parameters
1438

1439
    """
1440
    if not hasattr(self.op, "candidate_pool_size"):
1441
      self.op.candidate_pool_size = None
1442
    if self.op.candidate_pool_size is not None:
1443
      try:
1444
        self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1445
      except (ValueError, TypeError), err:
1446
        raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1447
                                   str(err))
1448
      if self.op.candidate_pool_size < 1:
1449
        raise errors.OpPrereqError("At least one master candidate needed")
1450

    
1451
  def ExpandNames(self):
1452
    # FIXME: in the future maybe other cluster params won't require checking on
1453
    # all nodes to be modified.
1454
    self.needed_locks = {
1455
      locking.LEVEL_NODE: locking.ALL_SET,
1456
    }
1457
    self.share_locks[locking.LEVEL_NODE] = 1
1458

    
1459
  def BuildHooksEnv(self):
1460
    """Build hooks env.
1461

1462
    """
1463
    env = {
1464
      "OP_TARGET": self.cfg.GetClusterName(),
1465
      "NEW_VG_NAME": self.op.vg_name,
1466
      }
1467
    mn = self.cfg.GetMasterNode()
1468
    return env, [mn], [mn]
1469

    
1470
  def CheckPrereq(self):
1471
    """Check prerequisites.
1472

1473
    This checks whether the given params don't conflict and
1474
    if the given volume group is valid.
1475

1476
    """
1477
    if self.op.vg_name is not None and not self.op.vg_name:
1478
      instances = self.cfg.GetAllInstancesInfo().values()
1479
      for inst in instances:
1480
        for disk in inst.disks:
1481
          if _RecursiveCheckIfLVMBased(disk):
1482
            raise errors.OpPrereqError("Cannot disable lvm storage while"
1483
                                       " lvm-based instances exist")
1484

    
1485
    node_list = self.acquired_locks[locking.LEVEL_NODE]
1486

    
1487
    # if vg_name not None, checks given volume group on all nodes
1488
    if self.op.vg_name:
1489
      vglist = self.rpc.call_vg_list(node_list)
1490
      for node in node_list:
1491
        if vglist[node].failed:
1492
          # ignoring down node
1493
          self.LogWarning("Node %s unreachable/error, ignoring" % node)
1494
          continue
1495
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1496
                                              self.op.vg_name,
1497
                                              constants.MIN_VG_SIZE)
1498
        if vgstatus:
1499
          raise errors.OpPrereqError("Error on node '%s': %s" %
1500
                                     (node, vgstatus))
1501

    
1502
    self.cluster = cluster = self.cfg.GetClusterInfo()
1503
    # validate params changes
1504
    if self.op.beparams:
1505
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1506
      self.new_beparams = objects.FillDict(
1507
        cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1508

    
1509
    if self.op.nicparams:
1510
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1511
      self.new_nicparams = objects.FillDict(
1512
        cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1513
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1514

    
1515
    # hypervisor list/parameters
1516
    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1517
    if self.op.hvparams:
1518
      if not isinstance(self.op.hvparams, dict):
1519
        raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1520
      for hv_name, hv_dict in self.op.hvparams.items():
1521
        if hv_name not in self.new_hvparams:
1522
          self.new_hvparams[hv_name] = hv_dict
1523
        else:
1524
          self.new_hvparams[hv_name].update(hv_dict)
1525

    
1526
    if self.op.enabled_hypervisors is not None:
1527
      self.hv_list = self.op.enabled_hypervisors
1528
    else:
1529
      self.hv_list = cluster.enabled_hypervisors
1530

    
1531
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1532
      # either the enabled list has changed, or the parameters have, validate
1533
      for hv_name, hv_params in self.new_hvparams.items():
1534
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1535
            (self.op.enabled_hypervisors and
1536
             hv_name in self.op.enabled_hypervisors)):
1537
          # either this is a new hypervisor, or its parameters have changed
1538
          hv_class = hypervisor.GetHypervisor(hv_name)
1539
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1540
          hv_class.CheckParameterSyntax(hv_params)
1541
          _CheckHVParams(self, node_list, hv_name, hv_params)
1542

    
1543
  def Exec(self, feedback_fn):
1544
    """Change the parameters of the cluster.
1545

1546
    """
1547
    if self.op.vg_name is not None:
1548
      new_volume = self.op.vg_name
1549
      if not new_volume:
1550
        new_volume = None
1551
      if new_volume != self.cfg.GetVGName():
1552
        self.cfg.SetVGName(new_volume)
1553
      else:
1554
        feedback_fn("Cluster LVM configuration already in desired"
1555
                    " state, not changing")
1556
    if self.op.hvparams:
1557
      self.cluster.hvparams = self.new_hvparams
1558
    if self.op.enabled_hypervisors is not None:
1559
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1560
    if self.op.beparams:
1561
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1562
    if self.op.nicparams:
1563
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1564

    
1565
    if self.op.candidate_pool_size is not None:
1566
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1567

    
1568
    self.cfg.Update(self.cluster)
1569

    
1570
    # we want to update nodes after the cluster so that if any errors
1571
    # happen, we have recorded and saved the cluster info
1572
    if self.op.candidate_pool_size is not None:
1573
      _AdjustCandidatePool(self)
1574

    
1575

    
1576
def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1577
  """Distribute additional files which are part of the cluster configuration.
1578

1579
  ConfigWriter takes care of distributing the config and ssconf files, but
1580
  there are more files which should be distributed to all nodes. This function
1581
  makes sure those are copied.
1582

1583
  @param lu: calling logical unit
1584
  @param additional_nodes: list of nodes not in the config to distribute to
1585

1586
  """
1587
  # 1. Gather target nodes
1588
  myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1589
  dist_nodes = lu.cfg.GetNodeList()
1590
  if additional_nodes is not None:
1591
    dist_nodes.extend(additional_nodes)
1592
  if myself.name in dist_nodes:
1593
    dist_nodes.remove(myself.name)
1594
  # 2. Gather files to distribute
1595
  dist_files = set([constants.ETC_HOSTS,
1596
                    constants.SSH_KNOWN_HOSTS_FILE,
1597
                    constants.RAPI_CERT_FILE,
1598
                    constants.RAPI_USERS_FILE,
1599
                   ])
1600

    
1601
  enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1602
  for hv_name in enabled_hypervisors:
1603
    hv_class = hypervisor.GetHypervisor(hv_name)
1604
    dist_files.update(hv_class.GetAncillaryFiles())
1605

    
1606
  # 3. Perform the files upload
1607
  for fname in dist_files:
1608
    if os.path.exists(fname):
1609
      result = lu.rpc.call_upload_file(dist_nodes, fname)
1610
      for to_node, to_result in result.items():
1611
         msg = to_result.RemoteFailMsg()
1612
         if msg:
1613
           msg = ("Copy of file %s to node %s failed: %s" %
1614
                   (fname, to_node, msg))
1615
           lu.proc.LogWarning(msg)
1616

    
1617

    
1618
class LURedistributeConfig(NoHooksLU):
1619
  """Force the redistribution of cluster configuration.
1620

1621
  This is a very simple LU.
1622

1623
  """
1624
  _OP_REQP = []
1625
  REQ_BGL = False
1626

    
1627
  def ExpandNames(self):
1628
    self.needed_locks = {
1629
      locking.LEVEL_NODE: locking.ALL_SET,
1630
    }
1631
    self.share_locks[locking.LEVEL_NODE] = 1
1632

    
1633
  def CheckPrereq(self):
1634
    """Check prerequisites.
1635

1636
    """
1637

    
1638
  def Exec(self, feedback_fn):
1639
    """Redistribute the configuration.
1640

1641
    """
1642
    self.cfg.Update(self.cfg.GetClusterInfo())
1643
    _RedistributeAncillaryFiles(self)
1644

    
1645

    
1646
def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1647
  """Sleep and poll for an instance's disk to sync.
1648

1649
  """
1650
  if not instance.disks:
1651
    return True
1652

    
1653
  if not oneshot:
1654
    lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1655

    
1656
  node = instance.primary_node
1657

    
1658
  for dev in instance.disks:
1659
    lu.cfg.SetDiskID(dev, node)
1660

    
1661
  retries = 0
1662
  while True:
1663
    max_time = 0
1664
    done = True
1665
    cumul_degraded = False
1666
    rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1667
    if rstats.failed or not rstats.data:
1668
      lu.LogWarning("Can't get any data from node %s", node)
1669
      retries += 1
1670
      if retries >= 10:
1671
        raise errors.RemoteError("Can't contact node %s for mirror data,"
1672
                                 " aborting." % node)
1673
      time.sleep(6)
1674
      continue
1675
    rstats = rstats.data
1676
    retries = 0
1677
    for i, mstat in enumerate(rstats):
1678
      if mstat is None:
1679
        lu.LogWarning("Can't compute data for node %s/%s",
1680
                           node, instance.disks[i].iv_name)
1681
        continue
1682
      # we ignore the ldisk parameter
1683
      perc_done, est_time, is_degraded, _ = mstat
1684
      cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1685
      if perc_done is not None:
1686
        done = False
1687
        if est_time is not None:
1688
          rem_time = "%d estimated seconds remaining" % est_time
1689
          max_time = est_time
1690
        else:
1691
          rem_time = "no time estimate"
1692
        lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1693
                        (instance.disks[i].iv_name, perc_done, rem_time))
1694
    if done or oneshot:
1695
      break
1696

    
1697
    time.sleep(min(60, max_time))
1698

    
1699
  if done:
1700
    lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1701
  return not cumul_degraded
1702

    
1703

    
1704
def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1705
  """Check that mirrors are not degraded.
1706

1707
  The ldisk parameter, if True, will change the test from the
1708
  is_degraded attribute (which represents overall non-ok status for
1709
  the device(s)) to the ldisk (representing the local storage status).
1710

1711
  """
1712
  lu.cfg.SetDiskID(dev, node)
1713
  if ldisk:
1714
    idx = 6
1715
  else:
1716
    idx = 5
1717

    
1718
  result = True
1719
  if on_primary or dev.AssembleOnSecondary():
1720
    rstats = lu.rpc.call_blockdev_find(node, dev)
1721
    msg = rstats.RemoteFailMsg()
1722
    if msg:
1723
      lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1724
      result = False
1725
    elif not rstats.payload:
1726
      lu.LogWarning("Can't find disk on node %s", node)
1727
      result = False
1728
    else:
1729
      result = result and (not rstats.payload[idx])
1730
  if dev.children:
1731
    for child in dev.children:
1732
      result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1733

    
1734
  return result
1735

    
1736

    
1737
class LUDiagnoseOS(NoHooksLU):
1738
  """Logical unit for OS diagnose/query.
1739

1740
  """
1741
  _OP_REQP = ["output_fields", "names"]
1742
  REQ_BGL = False
1743
  _FIELDS_STATIC = utils.FieldSet()
1744
  _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1745

    
1746
  def ExpandNames(self):
1747
    if self.op.names:
1748
      raise errors.OpPrereqError("Selective OS query not supported")
1749

    
1750
    _CheckOutputFields(static=self._FIELDS_STATIC,
1751
                       dynamic=self._FIELDS_DYNAMIC,
1752
                       selected=self.op.output_fields)
1753

    
1754
    # Lock all nodes, in shared mode
1755
    # Temporary removal of locks, should be reverted later
1756
    # TODO: reintroduce locks when they are lighter-weight
1757
    self.needed_locks = {}
1758
    #self.share_locks[locking.LEVEL_NODE] = 1
1759
    #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1760

    
1761
  def CheckPrereq(self):
1762
    """Check prerequisites.
1763

1764
    """
1765

    
1766
  @staticmethod
1767
  def _DiagnoseByOS(node_list, rlist):
1768
    """Remaps a per-node return list into an a per-os per-node dictionary
1769

1770
    @param node_list: a list with the names of all nodes
1771
    @param rlist: a map with node names as keys and OS objects as values
1772

1773
    @rtype: dict
1774
    @return: a dictionary with osnames as keys and as value another map, with
1775
        nodes as keys and list of OS objects as values, eg::
1776

1777
          {"debian-etch": {"node1": [<object>,...],
1778
                           "node2": [<object>,]}
1779
          }
1780

1781
    """
1782
    all_os = {}
1783
    # we build here the list of nodes that didn't fail the RPC (at RPC
1784
    # level), so that nodes with a non-responding node daemon don't
1785
    # make all OSes invalid
1786
    good_nodes = [node_name for node_name in rlist
1787
                  if not rlist[node_name].failed]
1788
    for node_name, nr in rlist.iteritems():
1789
      if nr.failed or not nr.data:
1790
        continue
1791
      for os_obj in nr.data:
1792
        if os_obj.name not in all_os:
1793
          # build a list of nodes for this os containing empty lists
1794
          # for each node in node_list
1795
          all_os[os_obj.name] = {}
1796
          for nname in good_nodes:
1797
            all_os[os_obj.name][nname] = []
1798
        all_os[os_obj.name][node_name].append(os_obj)
1799
    return all_os
1800

    
1801
  def Exec(self, feedback_fn):
1802
    """Compute the list of OSes.
1803

1804
    """
1805
    valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1806
    node_data = self.rpc.call_os_diagnose(valid_nodes)
1807
    if node_data == False:
1808
      raise errors.OpExecError("Can't gather the list of OSes")
1809
    pol = self._DiagnoseByOS(valid_nodes, node_data)
1810
    output = []
1811
    for os_name, os_data in pol.iteritems():
1812
      row = []
1813
      for field in self.op.output_fields:
1814
        if field == "name":
1815
          val = os_name
1816
        elif field == "valid":
1817
          val = utils.all([osl and osl[0] for osl in os_data.values()])
1818
        elif field == "node_status":
1819
          val = {}
1820
          for node_name, nos_list in os_data.iteritems():
1821
            val[node_name] = [(v.status, v.path) for v in nos_list]
1822
        else:
1823
          raise errors.ParameterError(field)
1824
        row.append(val)
1825
      output.append(row)
1826

    
1827
    return output
1828

    
1829

    
1830
class LURemoveNode(LogicalUnit):
1831
  """Logical unit for removing a node.
1832

1833
  """
1834
  HPATH = "node-remove"
1835
  HTYPE = constants.HTYPE_NODE
1836
  _OP_REQP = ["node_name"]
1837

    
1838
  def BuildHooksEnv(self):
1839
    """Build hooks env.
1840

1841
    This doesn't run on the target node in the pre phase as a failed
1842
    node would then be impossible to remove.
1843

1844
    """
1845
    env = {
1846
      "OP_TARGET": self.op.node_name,
1847
      "NODE_NAME": self.op.node_name,
1848
      }
1849
    all_nodes = self.cfg.GetNodeList()
1850
    all_nodes.remove(self.op.node_name)
1851
    return env, all_nodes, all_nodes
1852

    
1853
  def CheckPrereq(self):
1854
    """Check prerequisites.
1855

1856
    This checks:
1857
     - the node exists in the configuration
1858
     - it does not have primary or secondary instances
1859
     - it's not the master
1860

1861
    Any errors are signalled by raising errors.OpPrereqError.
1862

1863
    """
1864
    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1865
    if node is None:
1866
      raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1867

    
1868
    instance_list = self.cfg.GetInstanceList()
1869

    
1870
    masternode = self.cfg.GetMasterNode()
1871
    if node.name == masternode:
1872
      raise errors.OpPrereqError("Node is the master node,"
1873
                                 " you need to failover first.")
1874

    
1875
    for instance_name in instance_list:
1876
      instance = self.cfg.GetInstanceInfo(instance_name)
1877
      if node.name in instance.all_nodes:
1878
        raise errors.OpPrereqError("Instance %s is still running on the node,"
1879
                                   " please remove first." % instance_name)
1880
    self.op.node_name = node.name
1881
    self.node = node
1882

    
1883
  def Exec(self, feedback_fn):
1884
    """Removes the node from the cluster.
1885

1886
    """
1887
    node = self.node
1888
    logging.info("Stopping the node daemon and removing configs from node %s",
1889
                 node.name)
1890

    
1891
    self.context.RemoveNode(node.name)
1892

    
1893
    self.rpc.call_node_leave_cluster(node.name)
1894

    
1895
    # Promote nodes to master candidate as needed
1896
    _AdjustCandidatePool(self)
1897

    
1898

    
1899
class LUQueryNodes(NoHooksLU):
1900
  """Logical unit for querying nodes.
1901

1902
  """
1903
  _OP_REQP = ["output_fields", "names", "use_locking"]
1904
  REQ_BGL = False
1905
  _FIELDS_DYNAMIC = utils.FieldSet(
1906
    "dtotal", "dfree",
1907
    "mtotal", "mnode", "mfree",
1908
    "bootid",
1909
    "ctotal", "cnodes", "csockets",
1910
    )
1911

    
1912
  _FIELDS_STATIC = utils.FieldSet(
1913
    "name", "pinst_cnt", "sinst_cnt",
1914
    "pinst_list", "sinst_list",
1915
    "pip", "sip", "tags",
1916
    "serial_no",
1917
    "master_candidate",
1918
    "master",
1919
    "offline",
1920
    "drained",
1921
    )
1922

    
1923
  def ExpandNames(self):
1924
    _CheckOutputFields(static=self._FIELDS_STATIC,
1925
                       dynamic=self._FIELDS_DYNAMIC,
1926
                       selected=self.op.output_fields)
1927

    
1928
    self.needed_locks = {}
1929
    self.share_locks[locking.LEVEL_NODE] = 1
1930

    
1931
    if self.op.names:
1932
      self.wanted = _GetWantedNodes(self, self.op.names)
1933
    else:
1934
      self.wanted = locking.ALL_SET
1935

    
1936
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1937
    self.do_locking = self.do_node_query and self.op.use_locking
1938
    if self.do_locking:
1939
      # if we don't request only static fields, we need to lock the nodes
1940
      self.needed_locks[locking.LEVEL_NODE] = self.wanted
1941

    
1942

    
1943
  def CheckPrereq(self):
1944
    """Check prerequisites.
1945

1946
    """
1947
    # The validation of the node list is done in the _GetWantedNodes,
1948
    # if non empty, and if empty, there's no validation to do
1949
    pass
1950

    
1951
  def Exec(self, feedback_fn):
1952
    """Computes the list of nodes and their attributes.
1953

1954
    """
1955
    all_info = self.cfg.GetAllNodesInfo()
1956
    if self.do_locking:
1957
      nodenames = self.acquired_locks[locking.LEVEL_NODE]
1958
    elif self.wanted != locking.ALL_SET:
1959
      nodenames = self.wanted
1960
      missing = set(nodenames).difference(all_info.keys())
1961
      if missing:
1962
        raise errors.OpExecError(
1963
          "Some nodes were removed before retrieving their data: %s" % missing)
1964
    else:
1965
      nodenames = all_info.keys()
1966

    
1967
    nodenames = utils.NiceSort(nodenames)
1968
    nodelist = [all_info[name] for name in nodenames]
1969

    
1970
    # begin data gathering
1971

    
1972
    if self.do_node_query:
1973
      live_data = {}
1974
      node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1975
                                          self.cfg.GetHypervisorType())
1976
      for name in nodenames:
1977
        nodeinfo = node_data[name]
1978
        if not nodeinfo.failed and nodeinfo.data:
1979
          nodeinfo = nodeinfo.data
1980
          fn = utils.TryConvert
1981
          live_data[name] = {
1982
            "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1983
            "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1984
            "mfree": fn(int, nodeinfo.get('memory_free', None)),
1985
            "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1986
            "dfree": fn(int, nodeinfo.get('vg_free', None)),
1987
            "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1988
            "bootid": nodeinfo.get('bootid', None),
1989
            "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1990
            "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1991
            }
1992
        else:
1993
          live_data[name] = {}
1994
    else:
1995
      live_data = dict.fromkeys(nodenames, {})
1996

    
1997
    node_to_primary = dict([(name, set()) for name in nodenames])
1998
    node_to_secondary = dict([(name, set()) for name in nodenames])
1999

    
2000
    inst_fields = frozenset(("pinst_cnt", "pinst_list",
2001
                             "sinst_cnt", "sinst_list"))
2002
    if inst_fields & frozenset(self.op.output_fields):
2003
      instancelist = self.cfg.GetInstanceList()
2004

    
2005
      for instance_name in instancelist:
2006
        inst = self.cfg.GetInstanceInfo(instance_name)
2007
        if inst.primary_node in node_to_primary:
2008
          node_to_primary[inst.primary_node].add(inst.name)
2009
        for secnode in inst.secondary_nodes:
2010
          if secnode in node_to_secondary:
2011
            node_to_secondary[secnode].add(inst.name)
2012

    
2013
    master_node = self.cfg.GetMasterNode()
2014

    
2015
    # end data gathering
2016

    
2017
    output = []
2018
    for node in nodelist:
2019
      node_output = []
2020
      for field in self.op.output_fields:
2021
        if field == "name":
2022
          val = node.name
2023
        elif field == "pinst_list":
2024
          val = list(node_to_primary[node.name])
2025
        elif field == "sinst_list":
2026
          val = list(node_to_secondary[node.name])
2027
        elif field == "pinst_cnt":
2028
          val = len(node_to_primary[node.name])
2029
        elif field == "sinst_cnt":
2030
          val = len(node_to_secondary[node.name])
2031
        elif field == "pip":
2032
          val = node.primary_ip
2033
        elif field == "sip":
2034
          val = node.secondary_ip
2035
        elif field == "tags":
2036
          val = list(node.GetTags())
2037
        elif field == "serial_no":
2038
          val = node.serial_no
2039
        elif field == "master_candidate":
2040
          val = node.master_candidate
2041
        elif field == "master":
2042
          val = node.name == master_node
2043
        elif field == "offline":
2044
          val = node.offline
2045
        elif field == "drained":
2046
          val = node.drained
2047
        elif self._FIELDS_DYNAMIC.Matches(field):
2048
          val = live_data[node.name].get(field, None)
2049
        else:
2050
          raise errors.ParameterError(field)
2051
        node_output.append(val)
2052
      output.append(node_output)
2053

    
2054
    return output
2055

    
2056

    
2057
class LUQueryNodeVolumes(NoHooksLU):
2058
  """Logical unit for getting volumes on node(s).
2059

2060
  """
2061
  _OP_REQP = ["nodes", "output_fields"]
2062
  REQ_BGL = False
2063
  _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2064
  _FIELDS_STATIC = utils.FieldSet("node")
2065

    
2066
  def ExpandNames(self):
2067
    _CheckOutputFields(static=self._FIELDS_STATIC,
2068
                       dynamic=self._FIELDS_DYNAMIC,
2069
                       selected=self.op.output_fields)
2070

    
2071
    self.needed_locks = {}
2072
    self.share_locks[locking.LEVEL_NODE] = 1
2073
    if not self.op.nodes:
2074
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2075
    else:
2076
      self.needed_locks[locking.LEVEL_NODE] = \
2077
        _GetWantedNodes(self, self.op.nodes)
2078

    
2079
  def CheckPrereq(self):
2080
    """Check prerequisites.
2081

2082
    This checks that the fields required are valid output fields.
2083

2084
    """
2085
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2086

    
2087
  def Exec(self, feedback_fn):
2088
    """Computes the list of nodes and their attributes.
2089

2090
    """
2091
    nodenames = self.nodes
2092
    volumes = self.rpc.call_node_volumes(nodenames)
2093

    
2094
    ilist = [self.cfg.GetInstanceInfo(iname) for iname
2095
             in self.cfg.GetInstanceList()]
2096

    
2097
    lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2098

    
2099
    output = []
2100
    for node in nodenames:
2101
      if node not in volumes or volumes[node].failed or not volumes[node].data:
2102
        continue
2103

    
2104
      node_vols = volumes[node].data[:]
2105
      node_vols.sort(key=lambda vol: vol['dev'])
2106

    
2107
      for vol in node_vols:
2108
        node_output = []
2109
        for field in self.op.output_fields:
2110
          if field == "node":
2111
            val = node
2112
          elif field == "phys":
2113
            val = vol['dev']
2114
          elif field == "vg":
2115
            val = vol['vg']
2116
          elif field == "name":
2117
            val = vol['name']
2118
          elif field == "size":
2119
            val = int(float(vol['size']))
2120
          elif field == "instance":
2121
            for inst in ilist:
2122
              if node not in lv_by_node[inst]:
2123
                continue
2124
              if vol['name'] in lv_by_node[inst][node]:
2125
                val = inst.name
2126
                break
2127
            else:
2128
              val = '-'
2129
          else:
2130
            raise errors.ParameterError(field)
2131
          node_output.append(str(val))
2132

    
2133
        output.append(node_output)
2134

    
2135
    return output
2136

    
2137

    
2138
class LUAddNode(LogicalUnit):
2139
  """Logical unit for adding node to the cluster.
2140

2141
  """
2142
  HPATH = "node-add"
2143
  HTYPE = constants.HTYPE_NODE
2144
  _OP_REQP = ["node_name"]
2145

    
2146
  def BuildHooksEnv(self):
2147
    """Build hooks env.
2148

2149
    This will run on all nodes before, and on all nodes + the new node after.
2150

2151
    """
2152
    env = {
2153
      "OP_TARGET": self.op.node_name,
2154
      "NODE_NAME": self.op.node_name,
2155
      "NODE_PIP": self.op.primary_ip,
2156
      "NODE_SIP": self.op.secondary_ip,
2157
      }
2158
    nodes_0 = self.cfg.GetNodeList()
2159
    nodes_1 = nodes_0 + [self.op.node_name, ]
2160
    return env, nodes_0, nodes_1
2161

    
2162
  def CheckPrereq(self):
2163
    """Check prerequisites.
2164

2165
    This checks:
2166
     - the new node is not already in the config
2167
     - it is resolvable
2168
     - its parameters (single/dual homed) matches the cluster
2169

2170
    Any errors are signalled by raising errors.OpPrereqError.
2171

2172
    """
2173
    node_name = self.op.node_name
2174
    cfg = self.cfg
2175

    
2176
    dns_data = utils.HostInfo(node_name)
2177

    
2178
    node = dns_data.name
2179
    primary_ip = self.op.primary_ip = dns_data.ip
2180
    secondary_ip = getattr(self.op, "secondary_ip", None)
2181
    if secondary_ip is None:
2182
      secondary_ip = primary_ip
2183
    if not utils.IsValidIP(secondary_ip):
2184
      raise errors.OpPrereqError("Invalid secondary IP given")
2185
    self.op.secondary_ip = secondary_ip
2186

    
2187
    node_list = cfg.GetNodeList()
2188
    if not self.op.readd and node in node_list:
2189
      raise errors.OpPrereqError("Node %s is already in the configuration" %
2190
                                 node)
2191
    elif self.op.readd and node not in node_list:
2192
      raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2193

    
2194
    for existing_node_name in node_list:
2195
      existing_node = cfg.GetNodeInfo(existing_node_name)
2196

    
2197
      if self.op.readd and node == existing_node_name:
2198
        if (existing_node.primary_ip != primary_ip or
2199
            existing_node.secondary_ip != secondary_ip):
2200
          raise errors.OpPrereqError("Readded node doesn't have the same IP"
2201
                                     " address configuration as before")
2202
        continue
2203

    
2204
      if (existing_node.primary_ip == primary_ip or
2205
          existing_node.secondary_ip == primary_ip or
2206
          existing_node.primary_ip == secondary_ip or
2207
          existing_node.secondary_ip == secondary_ip):
2208
        raise errors.OpPrereqError("New node ip address(es) conflict with"
2209
                                   " existing node %s" % existing_node.name)
2210

    
2211
    # check that the type of the node (single versus dual homed) is the
2212
    # same as for the master
2213
    myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2214
    master_singlehomed = myself.secondary_ip == myself.primary_ip
2215
    newbie_singlehomed = secondary_ip == primary_ip
2216
    if master_singlehomed != newbie_singlehomed:
2217
      if master_singlehomed:
2218
        raise errors.OpPrereqError("The master has no private ip but the"
2219
                                   " new node has one")
2220
      else:
2221
        raise errors.OpPrereqError("The master has a private ip but the"
2222
                                   " new node doesn't have one")
2223

    
2224
    # checks reachablity
2225
    if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2226
      raise errors.OpPrereqError("Node not reachable by ping")
2227

    
2228
    if not newbie_singlehomed:
2229
      # check reachability from my secondary ip to newbie's secondary ip
2230
      if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2231
                           source=myself.secondary_ip):
2232
        raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2233
                                   " based ping to noded port")
2234

    
2235
    cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2236
    mc_now, _ = self.cfg.GetMasterCandidateStats()
2237
    master_candidate = mc_now < cp_size
2238

    
2239
    self.new_node = objects.Node(name=node,
2240
                                 primary_ip=primary_ip,
2241
                                 secondary_ip=secondary_ip,
2242
                                 master_candidate=master_candidate,
2243
                                 offline=False, drained=False)
2244

    
2245
  def Exec(self, feedback_fn):
2246
    """Adds the new node to the cluster.
2247

2248
    """
2249
    new_node = self.new_node
2250
    node = new_node.name
2251

    
2252
    # check connectivity
2253
    result = self.rpc.call_version([node])[node]
2254
    result.Raise()
2255
    if result.data:
2256
      if constants.PROTOCOL_VERSION == result.data:
2257
        logging.info("Communication to node %s fine, sw version %s match",
2258
                     node, result.data)
2259
      else:
2260
        raise errors.OpExecError("Version mismatch master version %s,"
2261
                                 " node version %s" %
2262
                                 (constants.PROTOCOL_VERSION, result.data))
2263
    else:
2264
      raise errors.OpExecError("Cannot get version from the new node")
2265

    
2266
    # setup ssh on node
2267
    logging.info("Copy ssh key to node %s", node)
2268
    priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2269
    keyarray = []
2270
    keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2271
                constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2272
                priv_key, pub_key]
2273

    
2274
    for i in keyfiles:
2275
      f = open(i, 'r')
2276
      try:
2277
        keyarray.append(f.read())
2278
      finally:
2279
        f.close()
2280

    
2281
    result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2282
                                    keyarray[2],
2283
                                    keyarray[3], keyarray[4], keyarray[5])
2284

    
2285
    msg = result.RemoteFailMsg()
2286
    if msg:
2287
      raise errors.OpExecError("Cannot transfer ssh keys to the"
2288
                               " new node: %s" % msg)
2289

    
2290
    # Add node to our /etc/hosts, and add key to known_hosts
2291
    if self.cfg.GetClusterInfo().modify_etc_hosts:
2292
      utils.AddHostToEtcHosts(new_node.name)
2293

    
2294
    if new_node.secondary_ip != new_node.primary_ip:
2295
      result = self.rpc.call_node_has_ip_address(new_node.name,
2296
                                                 new_node.secondary_ip)
2297
      if result.failed or not result.data:
2298
        raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2299
                                 " you gave (%s). Please fix and re-run this"
2300
                                 " command." % new_node.secondary_ip)
2301

    
2302
    node_verify_list = [self.cfg.GetMasterNode()]
2303
    node_verify_param = {
2304
      'nodelist': [node],
2305
      # TODO: do a node-net-test as well?
2306
    }
2307

    
2308
    result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2309
                                       self.cfg.GetClusterName())
2310
    for verifier in node_verify_list:
2311
      if result[verifier].failed or not result[verifier].data:
2312
        raise errors.OpExecError("Cannot communicate with %s's node daemon"
2313
                                 " for remote verification" % verifier)
2314
      if result[verifier].data['nodelist']:
2315
        for failed in result[verifier].data['nodelist']:
2316
          feedback_fn("ssh/hostname verification failed %s -> %s" %
2317
                      (verifier, result[verifier].data['nodelist'][failed]))
2318
        raise errors.OpExecError("ssh/hostname verification failed.")
2319

    
2320
    if self.op.readd:
2321
      _RedistributeAncillaryFiles(self)
2322
      self.context.ReaddNode(new_node)
2323
    else:
2324
      _RedistributeAncillaryFiles(self, additional_nodes=[node])
2325
      self.context.AddNode(new_node)
2326

    
2327

    
2328
class LUSetNodeParams(LogicalUnit):
2329
  """Modifies the parameters of a node.
2330

2331
  """
2332
  HPATH = "node-modify"
2333
  HTYPE = constants.HTYPE_NODE
2334
  _OP_REQP = ["node_name"]
2335
  REQ_BGL = False
2336

    
2337
  def CheckArguments(self):
2338
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2339
    if node_name is None:
2340
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2341
    self.op.node_name = node_name
2342
    _CheckBooleanOpField(self.op, 'master_candidate')
2343
    _CheckBooleanOpField(self.op, 'offline')
2344
    _CheckBooleanOpField(self.op, 'drained')
2345
    all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2346
    if all_mods.count(None) == 3:
2347
      raise errors.OpPrereqError("Please pass at least one modification")
2348
    if all_mods.count(True) > 1:
2349
      raise errors.OpPrereqError("Can't set the node into more than one"
2350
                                 " state at the same time")
2351

    
2352
  def ExpandNames(self):
2353
    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2354

    
2355
  def BuildHooksEnv(self):
2356
    """Build hooks env.
2357

2358
    This runs on the master node.
2359

2360
    """
2361
    env = {
2362
      "OP_TARGET": self.op.node_name,
2363
      "MASTER_CANDIDATE": str(self.op.master_candidate),
2364
      "OFFLINE": str(self.op.offline),
2365
      "DRAINED": str(self.op.drained),
2366
      }
2367
    nl = [self.cfg.GetMasterNode(),
2368
          self.op.node_name]
2369
    return env, nl, nl
2370

    
2371
  def CheckPrereq(self):
2372
    """Check prerequisites.
2373

2374
    This only checks the instance list against the existing names.
2375

2376
    """
2377
    node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2378

    
2379
    if ((self.op.master_candidate == False or self.op.offline == True or
2380
         self.op.drained == True) and node.master_candidate):
2381
      # we will demote the node from master_candidate
2382
      if self.op.node_name == self.cfg.GetMasterNode():
2383
        raise errors.OpPrereqError("The master node has to be a"
2384
                                   " master candidate, online and not drained")
2385
      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2386
      num_candidates, _ = self.cfg.GetMasterCandidateStats()
2387
      if num_candidates <= cp_size:
2388
        msg = ("Not enough master candidates (desired"
2389
               " %d, new value will be %d)" % (cp_size, num_candidates-1))
2390
        if self.op.force:
2391
          self.LogWarning(msg)
2392
        else:
2393
          raise errors.OpPrereqError(msg)
2394

    
2395
    if (self.op.master_candidate == True and
2396
        ((node.offline and not self.op.offline == False) or
2397
         (node.drained and not self.op.drained == False))):
2398
      raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2399
                                 " to master_candidate" % node.name)
2400

    
2401
    return
2402

    
2403
  def Exec(self, feedback_fn):
2404
    """Modifies a node.
2405

2406
    """
2407
    node = self.node
2408

    
2409
    result = []
2410
    changed_mc = False
2411

    
2412
    if self.op.offline is not None:
2413
      node.offline = self.op.offline
2414
      result.append(("offline", str(self.op.offline)))
2415
      if self.op.offline == True:
2416
        if node.master_candidate:
2417
          node.master_candidate = False
2418
          changed_mc = True
2419
          result.append(("master_candidate", "auto-demotion due to offline"))
2420
        if node.drained:
2421
          node.drained = False
2422
          result.append(("drained", "clear drained status due to offline"))
2423

    
2424
    if self.op.master_candidate is not None:
2425
      node.master_candidate = self.op.master_candidate
2426
      changed_mc = True
2427
      result.append(("master_candidate", str(self.op.master_candidate)))
2428
      if self.op.master_candidate == False:
2429
        rrc = self.rpc.call_node_demote_from_mc(node.name)
2430
        msg = rrc.RemoteFailMsg()
2431
        if msg:
2432
          self.LogWarning("Node failed to demote itself: %s" % msg)
2433

    
2434
    if self.op.drained is not None:
2435
      node.drained = self.op.drained
2436
      result.append(("drained", str(self.op.drained)))
2437
      if self.op.drained == True:
2438
        if node.master_candidate:
2439
          node.master_candidate = False
2440
          changed_mc = True
2441
          result.append(("master_candidate", "auto-demotion due to drain"))
2442
        if node.offline:
2443
          node.offline = False
2444
          result.append(("offline", "clear offline status due to drain"))
2445

    
2446
    # this will trigger configuration file update, if needed
2447
    self.cfg.Update(node)
2448
    # this will trigger job queue propagation or cleanup
2449
    if changed_mc:
2450
      self.context.ReaddNode(node)
2451

    
2452
    return result
2453

    
2454

    
2455
class LUPowercycleNode(NoHooksLU):
2456
  """Powercycles a node.
2457

2458
  """
2459
  _OP_REQP = ["node_name", "force"]
2460
  REQ_BGL = False
2461

    
2462
  def CheckArguments(self):
2463
    node_name = self.cfg.ExpandNodeName(self.op.node_name)
2464
    if node_name is None:
2465
      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2466
    self.op.node_name = node_name
2467
    if node_name == self.cfg.GetMasterNode() and not self.op.force:
2468
      raise errors.OpPrereqError("The node is the master and the force"
2469
                                 " parameter was not set")
2470

    
2471
  def ExpandNames(self):
2472
    """Locking for PowercycleNode.
2473

2474
    This is a last-resource option and shouldn't block on other
2475
    jobs. Therefore, we grab no locks.
2476

2477
    """
2478
    self.needed_locks = {}
2479

    
2480
  def CheckPrereq(self):
2481
    """Check prerequisites.
2482

2483
    This LU has no prereqs.
2484

2485
    """
2486
    pass
2487

    
2488
  def Exec(self, feedback_fn):
2489
    """Reboots a node.
2490

2491
    """
2492
    result = self.rpc.call_node_powercycle(self.op.node_name,
2493
                                           self.cfg.GetHypervisorType())
2494
    msg = result.RemoteFailMsg()
2495
    if msg:
2496
      raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2497
    return result.payload
2498

    
2499

    
2500
class LUQueryClusterInfo(NoHooksLU):
2501
  """Query cluster configuration.
2502

2503
  """
2504
  _OP_REQP = []
2505
  REQ_BGL = False
2506

    
2507
  def ExpandNames(self):
2508
    self.needed_locks = {}
2509

    
2510
  def CheckPrereq(self):
2511
    """No prerequsites needed for this LU.
2512

2513
    """
2514
    pass
2515

    
2516
  def Exec(self, feedback_fn):
2517
    """Return cluster config.
2518

2519
    """
2520
    cluster = self.cfg.GetClusterInfo()
2521
    result = {
2522
      "software_version": constants.RELEASE_VERSION,
2523
      "protocol_version": constants.PROTOCOL_VERSION,
2524
      "config_version": constants.CONFIG_VERSION,
2525
      "os_api_version": constants.OS_API_VERSION,
2526
      "export_version": constants.EXPORT_VERSION,
2527
      "architecture": (platform.architecture()[0], platform.machine()),
2528
      "name": cluster.cluster_name,
2529
      "master": cluster.master_node,
2530
      "default_hypervisor": cluster.default_hypervisor,
2531
      "enabled_hypervisors": cluster.enabled_hypervisors,
2532
      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2533
                        for hypervisor in cluster.enabled_hypervisors]),
2534
      "beparams": cluster.beparams,
2535
      "nicparams": cluster.nicparams,
2536
      "candidate_pool_size": cluster.candidate_pool_size,
2537
      "master_netdev": cluster.master_netdev,
2538
      "volume_group_name": cluster.volume_group_name,
2539
      "file_storage_dir": cluster.file_storage_dir,
2540
      }
2541

    
2542
    return result
2543

    
2544

    
2545
class LUQueryConfigValues(NoHooksLU):
2546
  """Return configuration values.
2547

2548
  """
2549
  _OP_REQP = []
2550
  REQ_BGL = False
2551
  _FIELDS_DYNAMIC = utils.FieldSet()
2552
  _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2553

    
2554
  def ExpandNames(self):
2555
    self.needed_locks = {}
2556

    
2557
    _CheckOutputFields(static=self._FIELDS_STATIC,
2558
                       dynamic=self._FIELDS_DYNAMIC,
2559
                       selected=self.op.output_fields)
2560

    
2561
  def CheckPrereq(self):
2562
    """No prerequisites.
2563

2564
    """
2565
    pass
2566

    
2567
  def Exec(self, feedback_fn):
2568
    """Dump a representation of the cluster config to the standard output.
2569

2570
    """
2571
    values = []
2572
    for field in self.op.output_fields:
2573
      if field == "cluster_name":
2574
        entry = self.cfg.GetClusterName()
2575
      elif field == "master_node":
2576
        entry = self.cfg.GetMasterNode()
2577
      elif field == "drain_flag":
2578
        entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2579
      else:
2580
        raise errors.ParameterError(field)
2581
      values.append(entry)
2582
    return values
2583

    
2584

    
2585
class LUActivateInstanceDisks(NoHooksLU):
2586
  """Bring up an instance's disks.
2587

2588
  """
2589
  _OP_REQP = ["instance_name"]
2590
  REQ_BGL = False
2591

    
2592
  def ExpandNames(self):
2593
    self._ExpandAndLockInstance()
2594
    self.needed_locks[locking.LEVEL_NODE] = []
2595
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2596

    
2597
  def DeclareLocks(self, level):
2598
    if level == locking.LEVEL_NODE:
2599
      self._LockInstancesNodes()
2600

    
2601
  def CheckPrereq(self):
2602
    """Check prerequisites.
2603

2604
    This checks that the instance is in the cluster.
2605

2606
    """
2607
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2608
    assert self.instance is not None, \
2609
      "Cannot retrieve locked instance %s" % self.op.instance_name
2610
    _CheckNodeOnline(self, self.instance.primary_node)
2611

    
2612
  def Exec(self, feedback_fn):
2613
    """Activate the disks.
2614

2615
    """
2616
    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2617
    if not disks_ok:
2618
      raise errors.OpExecError("Cannot activate block devices")
2619

    
2620
    return disks_info
2621

    
2622

    
2623
def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2624
  """Prepare the block devices for an instance.
2625

2626
  This sets up the block devices on all nodes.
2627

2628
  @type lu: L{LogicalUnit}
2629
  @param lu: the logical unit on whose behalf we execute
2630
  @type instance: L{objects.Instance}
2631
  @param instance: the instance for whose disks we assemble
2632
  @type ignore_secondaries: boolean
2633
  @param ignore_secondaries: if true, errors on secondary nodes
2634
      won't result in an error return from the function
2635
  @return: False if the operation failed, otherwise a list of
2636
      (host, instance_visible_name, node_visible_name)
2637
      with the mapping from node devices to instance devices
2638

2639
  """
2640
  device_info = []
2641
  disks_ok = True
2642
  iname = instance.name
2643
  # With the two passes mechanism we try to reduce the window of
2644
  # opportunity for the race condition of switching DRBD to primary
2645
  # before handshaking occured, but we do not eliminate it
2646

    
2647
  # The proper fix would be to wait (with some limits) until the
2648
  # connection has been made and drbd transitions from WFConnection
2649
  # into any other network-connected state (Connected, SyncTarget,
2650
  # SyncSource, etc.)
2651

    
2652
  # 1st pass, assemble on all nodes in secondary mode
2653
  for inst_disk in instance.disks:
2654
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2655
      lu.cfg.SetDiskID(node_disk, node)
2656
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2657
      msg = result.RemoteFailMsg()
2658
      if msg:
2659
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2660
                           " (is_primary=False, pass=1): %s",
2661
                           inst_disk.iv_name, node, msg)
2662
        if not ignore_secondaries:
2663
          disks_ok = False
2664

    
2665
  # FIXME: race condition on drbd migration to primary
2666

    
2667
  # 2nd pass, do only the primary node
2668
  for inst_disk in instance.disks:
2669
    for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2670
      if node != instance.primary_node:
2671
        continue
2672
      lu.cfg.SetDiskID(node_disk, node)
2673
      result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2674
      msg = result.RemoteFailMsg()
2675
      if msg:
2676
        lu.proc.LogWarning("Could not prepare block device %s on node %s"
2677
                           " (is_primary=True, pass=2): %s",
2678
                           inst_disk.iv_name, node, msg)
2679
        disks_ok = False
2680
    device_info.append((instance.primary_node, inst_disk.iv_name,
2681
                        result.payload))
2682

    
2683
  # leave the disks configured for the primary node
2684
  # this is a workaround that would be fixed better by
2685
  # improving the logical/physical id handling
2686
  for disk in instance.disks:
2687
    lu.cfg.SetDiskID(disk, instance.primary_node)
2688

    
2689
  return disks_ok, device_info
2690

    
2691

    
2692
def _StartInstanceDisks(lu, instance, force):
2693
  """Start the disks of an instance.
2694

2695
  """
2696
  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2697
                                           ignore_secondaries=force)
2698
  if not disks_ok:
2699
    _ShutdownInstanceDisks(lu, instance)
2700
    if force is not None and not force:
2701
      lu.proc.LogWarning("", hint="If the message above refers to a"
2702
                         " secondary node,"
2703
                         " you can retry the operation using '--force'.")
2704
    raise errors.OpExecError("Disk consistency error")
2705

    
2706

    
2707
class LUDeactivateInstanceDisks(NoHooksLU):
2708
  """Shutdown an instance's disks.
2709

2710
  """
2711
  _OP_REQP = ["instance_name"]
2712
  REQ_BGL = False
2713

    
2714
  def ExpandNames(self):
2715
    self._ExpandAndLockInstance()
2716
    self.needed_locks[locking.LEVEL_NODE] = []
2717
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2718

    
2719
  def DeclareLocks(self, level):
2720
    if level == locking.LEVEL_NODE:
2721
      self._LockInstancesNodes()
2722

    
2723
  def CheckPrereq(self):
2724
    """Check prerequisites.
2725

2726
    This checks that the instance is in the cluster.
2727

2728
    """
2729
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2730
    assert self.instance is not None, \
2731
      "Cannot retrieve locked instance %s" % self.op.instance_name
2732

    
2733
  def Exec(self, feedback_fn):
2734
    """Deactivate the disks
2735

2736
    """
2737
    instance = self.instance
2738
    _SafeShutdownInstanceDisks(self, instance)
2739

    
2740

    
2741
def _SafeShutdownInstanceDisks(lu, instance):
2742
  """Shutdown block devices of an instance.
2743

2744
  This function checks if an instance is running, before calling
2745
  _ShutdownInstanceDisks.
2746

2747
  """
2748
  ins_l = lu.rpc.call_instance_list([instance.primary_node],
2749
                                      [instance.hypervisor])
2750
  ins_l = ins_l[instance.primary_node]
2751
  if ins_l.failed or not isinstance(ins_l.data, list):
2752
    raise errors.OpExecError("Can't contact node '%s'" %
2753
                             instance.primary_node)
2754

    
2755
  if instance.name in ins_l.data:
2756
    raise errors.OpExecError("Instance is running, can't shutdown"
2757
                             " block devices.")
2758

    
2759
  _ShutdownInstanceDisks(lu, instance)
2760

    
2761

    
2762
def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2763
  """Shutdown block devices of an instance.
2764

2765
  This does the shutdown on all nodes of the instance.
2766

2767
  If the ignore_primary is false, errors on the primary node are
2768
  ignored.
2769

2770
  """
2771
  all_result = True
2772
  for disk in instance.disks:
2773
    for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2774
      lu.cfg.SetDiskID(top_disk, node)
2775
      result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2776
      msg = result.RemoteFailMsg()
2777
      if msg:
2778
        lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2779
                      disk.iv_name, node, msg)
2780
        if not ignore_primary or node != instance.primary_node:
2781
          all_result = False
2782
  return all_result
2783

    
2784

    
2785
def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2786
  """Checks if a node has enough free memory.
2787

2788
  This function check if a given node has the needed amount of free
2789
  memory. In case the node has less memory or we cannot get the
2790
  information from the node, this function raise an OpPrereqError
2791
  exception.
2792

2793
  @type lu: C{LogicalUnit}
2794
  @param lu: a logical unit from which we get configuration data
2795
  @type node: C{str}
2796
  @param node: the node to check
2797
  @type reason: C{str}
2798
  @param reason: string to use in the error message
2799
  @type requested: C{int}
2800
  @param requested: the amount of memory in MiB to check for
2801
  @type hypervisor_name: C{str}
2802
  @param hypervisor_name: the hypervisor to ask for memory stats
2803
  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2804
      we cannot check the node
2805

2806
  """
2807
  nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2808
  nodeinfo[node].Raise()
2809
  free_mem = nodeinfo[node].data.get('memory_free')
2810
  if not isinstance(free_mem, int):
2811
    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2812
                             " was '%s'" % (node, free_mem))
2813
  if requested > free_mem:
2814
    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2815
                             " needed %s MiB, available %s MiB" %
2816
                             (node, reason, requested, free_mem))
2817

    
2818

    
2819
class LUStartupInstance(LogicalUnit):
2820
  """Starts an instance.
2821

2822
  """
2823
  HPATH = "instance-start"
2824
  HTYPE = constants.HTYPE_INSTANCE
2825
  _OP_REQP = ["instance_name", "force"]
2826
  REQ_BGL = False
2827

    
2828
  def ExpandNames(self):
2829
    self._ExpandAndLockInstance()
2830

    
2831
  def BuildHooksEnv(self):
2832
    """Build hooks env.
2833

2834
    This runs on master, primary and secondary nodes of the instance.
2835

2836
    """
2837
    env = {
2838
      "FORCE": self.op.force,
2839
      }
2840
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2841
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2842
    return env, nl, nl
2843

    
2844
  def CheckPrereq(self):
2845
    """Check prerequisites.
2846

2847
    This checks that the instance is in the cluster.
2848

2849
    """
2850
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2851
    assert self.instance is not None, \
2852
      "Cannot retrieve locked instance %s" % self.op.instance_name
2853

    
2854
    # extra beparams
2855
    self.beparams = getattr(self.op, "beparams", {})
2856
    if self.beparams:
2857
      if not isinstance(self.beparams, dict):
2858
        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2859
                                   " dict" % (type(self.beparams), ))
2860
      # fill the beparams dict
2861
      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2862
      self.op.beparams = self.beparams
2863

    
2864
    # extra hvparams
2865
    self.hvparams = getattr(self.op, "hvparams", {})
2866
    if self.hvparams:
2867
      if not isinstance(self.hvparams, dict):
2868
        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2869
                                   " dict" % (type(self.hvparams), ))
2870

    
2871
      # check hypervisor parameter syntax (locally)
2872
      cluster = self.cfg.GetClusterInfo()
2873
      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2874
      filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2875
                                    instance.hvparams)
2876
      filled_hvp.update(self.hvparams)
2877
      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2878
      hv_type.CheckParameterSyntax(filled_hvp)
2879
      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2880
      self.op.hvparams = self.hvparams
2881

    
2882
    _CheckNodeOnline(self, instance.primary_node)
2883

    
2884
    bep = self.cfg.GetClusterInfo().FillBE(instance)
2885
    # check bridges existance
2886
    _CheckInstanceBridgesExist(self, instance)
2887

    
2888
    remote_info = self.rpc.call_instance_info(instance.primary_node,
2889
                                              instance.name,
2890
                                              instance.hypervisor)
2891
    remote_info.Raise()
2892
    if not remote_info.data:
2893
      _CheckNodeFreeMemory(self, instance.primary_node,
2894
                           "starting instance %s" % instance.name,
2895
                           bep[constants.BE_MEMORY], instance.hypervisor)
2896

    
2897
  def Exec(self, feedback_fn):
2898
    """Start the instance.
2899

2900
    """
2901
    instance = self.instance
2902
    force = self.op.force
2903

    
2904
    self.cfg.MarkInstanceUp(instance.name)
2905

    
2906
    node_current = instance.primary_node
2907

    
2908
    _StartInstanceDisks(self, instance, force)
2909

    
2910
    result = self.rpc.call_instance_start(node_current, instance,
2911
                                          self.hvparams, self.beparams)
2912
    msg = result.RemoteFailMsg()
2913
    if msg:
2914
      _ShutdownInstanceDisks(self, instance)
2915
      raise errors.OpExecError("Could not start instance: %s" % msg)
2916

    
2917

    
2918
class LURebootInstance(LogicalUnit):
2919
  """Reboot an instance.
2920

2921
  """
2922
  HPATH = "instance-reboot"
2923
  HTYPE = constants.HTYPE_INSTANCE
2924
  _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2925
  REQ_BGL = False
2926

    
2927
  def ExpandNames(self):
2928
    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2929
                                   constants.INSTANCE_REBOOT_HARD,
2930
                                   constants.INSTANCE_REBOOT_FULL]:
2931
      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2932
                                  (constants.INSTANCE_REBOOT_SOFT,
2933
                                   constants.INSTANCE_REBOOT_HARD,
2934
                                   constants.INSTANCE_REBOOT_FULL))
2935
    self._ExpandAndLockInstance()
2936

    
2937
  def BuildHooksEnv(self):
2938
    """Build hooks env.
2939

2940
    This runs on master, primary and secondary nodes of the instance.
2941

2942
    """
2943
    env = {
2944
      "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2945
      "REBOOT_TYPE": self.op.reboot_type,
2946
      }
2947
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2948
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2949
    return env, nl, nl
2950

    
2951
  def CheckPrereq(self):
2952
    """Check prerequisites.
2953

2954
    This checks that the instance is in the cluster.
2955

2956
    """
2957
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2958
    assert self.instance is not None, \
2959
      "Cannot retrieve locked instance %s" % self.op.instance_name
2960

    
2961
    _CheckNodeOnline(self, instance.primary_node)
2962

    
2963
    # check bridges existance
2964
    _CheckInstanceBridgesExist(self, instance)
2965

    
2966
  def Exec(self, feedback_fn):
2967
    """Reboot the instance.
2968

2969
    """
2970
    instance = self.instance
2971
    ignore_secondaries = self.op.ignore_secondaries
2972
    reboot_type = self.op.reboot_type
2973

    
2974
    node_current = instance.primary_node
2975

    
2976
    if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2977
                       constants.INSTANCE_REBOOT_HARD]:
2978
      for disk in instance.disks:
2979
        self.cfg.SetDiskID(disk, node_current)
2980
      result = self.rpc.call_instance_reboot(node_current, instance,
2981
                                             reboot_type)
2982
      msg = result.RemoteFailMsg()
2983
      if msg:
2984
        raise errors.OpExecError("Could not reboot instance: %s" % msg)
2985
    else:
2986
      result = self.rpc.call_instance_shutdown(node_current, instance)
2987
      msg = result.RemoteFailMsg()
2988
      if msg:
2989
        raise errors.OpExecError("Could not shutdown instance for"
2990
                                 " full reboot: %s" % msg)
2991
      _ShutdownInstanceDisks(self, instance)
2992
      _StartInstanceDisks(self, instance, ignore_secondaries)
2993
      result = self.rpc.call_instance_start(node_current, instance, None, None)
2994
      msg = result.RemoteFailMsg()
2995
      if msg:
2996
        _ShutdownInstanceDisks(self, instance)
2997
        raise errors.OpExecError("Could not start instance for"
2998
                                 " full reboot: %s" % msg)
2999

    
3000
    self.cfg.MarkInstanceUp(instance.name)
3001

    
3002

    
3003
class LUShutdownInstance(LogicalUnit):
3004
  """Shutdown an instance.
3005

3006
  """
3007
  HPATH = "instance-stop"
3008
  HTYPE = constants.HTYPE_INSTANCE
3009
  _OP_REQP = ["instance_name"]
3010
  REQ_BGL = False
3011

    
3012
  def ExpandNames(self):
3013
    self._ExpandAndLockInstance()
3014

    
3015
  def BuildHooksEnv(self):
3016
    """Build hooks env.
3017

3018
    This runs on master, primary and secondary nodes of the instance.
3019

3020
    """
3021
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3022
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3023
    return env, nl, nl
3024

    
3025
  def CheckPrereq(self):
3026
    """Check prerequisites.
3027

3028
    This checks that the instance is in the cluster.
3029

3030
    """
3031
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3032
    assert self.instance is not None, \
3033
      "Cannot retrieve locked instance %s" % self.op.instance_name
3034
    _CheckNodeOnline(self, self.instance.primary_node)
3035

    
3036
  def Exec(self, feedback_fn):
3037
    """Shutdown the instance.
3038

3039
    """
3040
    instance = self.instance
3041
    node_current = instance.primary_node
3042
    self.cfg.MarkInstanceDown(instance.name)
3043
    result = self.rpc.call_instance_shutdown(node_current, instance)
3044
    msg = result.RemoteFailMsg()
3045
    if msg:
3046
      self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3047

    
3048
    _ShutdownInstanceDisks(self, instance)
3049

    
3050

    
3051
class LUReinstallInstance(LogicalUnit):
3052
  """Reinstall an instance.
3053

3054
  """
3055
  HPATH = "instance-reinstall"
3056
  HTYPE = constants.HTYPE_INSTANCE
3057
  _OP_REQP = ["instance_name"]
3058
  REQ_BGL = False
3059

    
3060
  def ExpandNames(self):
3061
    self._ExpandAndLockInstance()
3062

    
3063
  def BuildHooksEnv(self):
3064
    """Build hooks env.
3065

3066
    This runs on master, primary and secondary nodes of the instance.
3067

3068
    """
3069
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3070
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3071
    return env, nl, nl
3072

    
3073
  def CheckPrereq(self):
3074
    """Check prerequisites.
3075

3076
    This checks that the instance is in the cluster and is not running.
3077

3078
    """
3079
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3080
    assert instance is not None, \
3081
      "Cannot retrieve locked instance %s" % self.op.instance_name
3082
    _CheckNodeOnline(self, instance.primary_node)
3083

    
3084
    if instance.disk_template == constants.DT_DISKLESS:
3085
      raise errors.OpPrereqError("Instance '%s' has no disks" %
3086
                                 self.op.instance_name)
3087
    if instance.admin_up:
3088
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3089
                                 self.op.instance_name)
3090
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3091
                                              instance.name,
3092
                                              instance.hypervisor)
3093
    remote_info.Raise()
3094
    if remote_info.data:
3095
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3096
                                 (self.op.instance_name,
3097
                                  instance.primary_node))
3098

    
3099
    self.op.os_type = getattr(self.op, "os_type", None)
3100
    if self.op.os_type is not None:
3101
      # OS verification
3102
      pnode = self.cfg.GetNodeInfo(
3103
        self.cfg.ExpandNodeName(instance.primary_node))
3104
      if pnode is None:
3105
        raise errors.OpPrereqError("Primary node '%s' is unknown" %
3106
                                   self.op.pnode)
3107
      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3108
      result.Raise()
3109
      if not isinstance(result.data, objects.OS):
3110
        raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3111
                                   " primary node"  % self.op.os_type)
3112

    
3113
    self.instance = instance
3114

    
3115
  def Exec(self, feedback_fn):
3116
    """Reinstall the instance.
3117

3118
    """
3119
    inst = self.instance
3120

    
3121
    if self.op.os_type is not None:
3122
      feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3123
      inst.os = self.op.os_type
3124
      self.cfg.Update(inst)
3125

    
3126
    _StartInstanceDisks(self, inst, None)
3127
    try:
3128
      feedback_fn("Running the instance OS create scripts...")
3129
      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3130
      msg = result.RemoteFailMsg()
3131
      if msg:
3132
        raise errors.OpExecError("Could not install OS for instance %s"
3133
                                 " on node %s: %s" %
3134
                                 (inst.name, inst.primary_node, msg))
3135
    finally:
3136
      _ShutdownInstanceDisks(self, inst)
3137

    
3138

    
3139
class LURenameInstance(LogicalUnit):
3140
  """Rename an instance.
3141

3142
  """
3143
  HPATH = "instance-rename"
3144
  HTYPE = constants.HTYPE_INSTANCE
3145
  _OP_REQP = ["instance_name", "new_name"]
3146

    
3147
  def BuildHooksEnv(self):
3148
    """Build hooks env.
3149

3150
    This runs on master, primary and secondary nodes of the instance.
3151

3152
    """
3153
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3154
    env["INSTANCE_NEW_NAME"] = self.op.new_name
3155
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3156
    return env, nl, nl
3157

    
3158
  def CheckPrereq(self):
3159
    """Check prerequisites.
3160

3161
    This checks that the instance is in the cluster and is not running.
3162

3163
    """
3164
    instance = self.cfg.GetInstanceInfo(
3165
      self.cfg.ExpandInstanceName(self.op.instance_name))
3166
    if instance is None:
3167
      raise errors.OpPrereqError("Instance '%s' not known" %
3168
                                 self.op.instance_name)
3169
    _CheckNodeOnline(self, instance.primary_node)
3170

    
3171
    if instance.admin_up:
3172
      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3173
                                 self.op.instance_name)
3174
    remote_info = self.rpc.call_instance_info(instance.primary_node,
3175
                                              instance.name,
3176
                                              instance.hypervisor)
3177
    remote_info.Raise()
3178
    if remote_info.data:
3179
      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3180
                                 (self.op.instance_name,
3181
                                  instance.primary_node))
3182
    self.instance = instance
3183

    
3184
    # new name verification
3185
    name_info = utils.HostInfo(self.op.new_name)
3186

    
3187
    self.op.new_name = new_name = name_info.name
3188
    instance_list = self.cfg.GetInstanceList()
3189
    if new_name in instance_list:
3190
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3191
                                 new_name)
3192

    
3193
    if not getattr(self.op, "ignore_ip", False):
3194
      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3195
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
3196
                                   (name_info.ip, new_name))
3197

    
3198

    
3199
  def Exec(self, feedback_fn):
3200
    """Reinstall the instance.
3201

3202
    """
3203
    inst = self.instance
3204
    old_name = inst.name
3205

    
3206
    if inst.disk_template == constants.DT_FILE:
3207
      old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3208

    
3209
    self.cfg.RenameInstance(inst.name, self.op.new_name)
3210
    # Change the instance lock. This is definitely safe while we hold the BGL
3211
    self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3212
    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3213

    
3214
    # re-read the instance from the configuration after rename
3215
    inst = self.cfg.GetInstanceInfo(self.op.new_name)
3216

    
3217
    if inst.disk_template == constants.DT_FILE:
3218
      new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3219
      result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3220
                                                     old_file_storage_dir,
3221
                                                     new_file_storage_dir)
3222
      result.Raise()
3223
      if not result.data:
3224
        raise errors.OpExecError("Could not connect to node '%s' to rename"
3225
                                 " directory '%s' to '%s' (but the instance"
3226
                                 " has been renamed in Ganeti)" % (
3227
                                 inst.primary_node, old_file_storage_dir,
3228
                                 new_file_storage_dir))
3229

    
3230
      if not result.data[0]:
3231
        raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3232
                                 " (but the instance has been renamed in"
3233
                                 " Ganeti)" % (old_file_storage_dir,
3234
                                               new_file_storage_dir))
3235

    
3236
    _StartInstanceDisks(self, inst, None)
3237
    try:
3238
      result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3239
                                                 old_name)
3240
      msg = result.RemoteFailMsg()
3241
      if msg:
3242
        msg = ("Could not run OS rename script for instance %s on node %s"
3243
               " (but the instance has been renamed in Ganeti): %s" %
3244
               (inst.name, inst.primary_node, msg))
3245
        self.proc.LogWarning(msg)
3246
    finally:
3247
      _ShutdownInstanceDisks(self, inst)
3248

    
3249

    
3250
class LURemoveInstance(LogicalUnit):
3251
  """Remove an instance.
3252

3253
  """
3254
  HPATH = "instance-remove"
3255
  HTYPE = constants.HTYPE_INSTANCE
3256
  _OP_REQP = ["instance_name", "ignore_failures"]
3257
  REQ_BGL = False
3258

    
3259
  def ExpandNames(self):
3260
    self._ExpandAndLockInstance()
3261
    self.needed_locks[locking.LEVEL_NODE] = []
3262
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3263

    
3264
  def DeclareLocks(self, level):
3265
    if level == locking.LEVEL_NODE:
3266
      self._LockInstancesNodes()
3267

    
3268
  def BuildHooksEnv(self):
3269
    """Build hooks env.
3270

3271
    This runs on master, primary and secondary nodes of the instance.
3272

3273
    """
3274
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3275
    nl = [self.cfg.GetMasterNode()]
3276
    return env, nl, nl
3277

    
3278
  def CheckPrereq(self):
3279
    """Check prerequisites.
3280

3281
    This checks that the instance is in the cluster.
3282

3283
    """
3284
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3285
    assert self.instance is not None, \
3286
      "Cannot retrieve locked instance %s" % self.op.instance_name
3287

    
3288
  def Exec(self, feedback_fn):
3289
    """Remove the instance.
3290

3291
    """
3292
    instance = self.instance
3293
    logging.info("Shutting down instance %s on node %s",
3294
                 instance.name, instance.primary_node)
3295

    
3296
    result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3297
    msg = result.RemoteFailMsg()
3298
    if msg:
3299
      if self.op.ignore_failures:
3300
        feedback_fn("Warning: can't shutdown instance: %s" % msg)
3301
      else:
3302
        raise errors.OpExecError("Could not shutdown instance %s on"
3303
                                 " node %s: %s" %
3304
                                 (instance.name, instance.primary_node, msg))
3305

    
3306
    logging.info("Removing block devices for instance %s", instance.name)
3307

    
3308
    if not _RemoveDisks(self, instance):
3309
      if self.op.ignore_failures:
3310
        feedback_fn("Warning: can't remove instance's disks")
3311
      else:
3312
        raise errors.OpExecError("Can't remove instance's disks")
3313

    
3314
    logging.info("Removing instance %s out of cluster config", instance.name)
3315

    
3316
    self.cfg.RemoveInstance(instance.name)
3317
    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3318

    
3319

    
3320
class LUQueryInstances(NoHooksLU):
3321
  """Logical unit for querying instances.
3322

3323
  """
3324
  _OP_REQP = ["output_fields", "names", "use_locking"]
3325
  REQ_BGL = False
3326
  _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3327
                                    "admin_state",
3328
                                    "disk_template", "ip", "mac", "bridge",
3329
                                    "sda_size", "sdb_size", "vcpus", "tags",
3330
                                    "network_port", "beparams",
3331
                                    r"(disk)\.(size)/([0-9]+)",
3332
                                    r"(disk)\.(sizes)", "disk_usage",
3333
                                    r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3334
                                    r"(nic)\.(macs|ips|bridges)",
3335
                                    r"(disk|nic)\.(count)",
3336
                                    "serial_no", "hypervisor", "hvparams",] +
3337
                                  ["hv/%s" % name
3338
                                   for name in constants.HVS_PARAMETERS] +
3339
                                  ["be/%s" % name
3340
                                   for name in constants.BES_PARAMETERS])
3341
  _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3342

    
3343

    
3344
  def ExpandNames(self):
3345
    _CheckOutputFields(static=self._FIELDS_STATIC,
3346
                       dynamic=self._FIELDS_DYNAMIC,
3347
                       selected=self.op.output_fields)
3348

    
3349
    self.needed_locks = {}
3350
    self.share_locks[locking.LEVEL_INSTANCE] = 1
3351
    self.share_locks[locking.LEVEL_NODE] = 1
3352

    
3353
    if self.op.names:
3354
      self.wanted = _GetWantedInstances(self, self.op.names)
3355
    else:
3356
      self.wanted = locking.ALL_SET
3357

    
3358
    self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3359
    self.do_locking = self.do_node_query and self.op.use_locking
3360
    if self.do_locking:
3361
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3362
      self.needed_locks[locking.LEVEL_NODE] = []
3363
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3364

    
3365
  def DeclareLocks(self, level):
3366
    if level == locking.LEVEL_NODE and self.do_locking:
3367
      self._LockInstancesNodes()
3368

    
3369
  def CheckPrereq(self):
3370
    """Check prerequisites.
3371

3372
    """
3373
    pass
3374

    
3375
  def Exec(self, feedback_fn):
3376
    """Computes the list of nodes and their attributes.
3377

3378
    """
3379
    all_info = self.cfg.GetAllInstancesInfo()
3380
    if self.wanted == locking.ALL_SET:
3381
      # caller didn't specify instance names, so ordering is not important
3382
      if self.do_locking:
3383
        instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3384
      else:
3385
        instance_names = all_info.keys()
3386
      instance_names = utils.NiceSort(instance_names)
3387
    else:
3388
      # caller did specify names, so we must keep the ordering
3389
      if self.do_locking:
3390
        tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3391
      else:
3392
        tgt_set = all_info.keys()
3393
      missing = set(self.wanted).difference(tgt_set)
3394
      if missing:
3395
        raise errors.OpExecError("Some instances were removed before"
3396
                                 " retrieving their data: %s" % missing)
3397
      instance_names = self.wanted
3398

    
3399
    instance_list = [all_info[iname] for iname in instance_names]
3400

    
3401
    # begin data gathering
3402

    
3403
    nodes = frozenset([inst.primary_node for inst in instance_list])
3404
    hv_list = list(set([inst.hypervisor for inst in instance_list]))
3405

    
3406
    bad_nodes = []
3407
    off_nodes = []
3408
    if self.do_node_query:
3409
      live_data = {}
3410
      node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3411
      for name in nodes:
3412
        result = node_data[name]
3413
        if result.offline:
3414
          # offline nodes will be in both lists
3415
          off_nodes.append(name)
3416
        if result.failed:
3417
          bad_nodes.append(name)
3418
        else:
3419
          if result.data:
3420
            live_data.update(result.data)
3421
            # else no instance is alive
3422
    else:
3423
      live_data = dict([(name, {}) for name in instance_names])
3424

    
3425
    # end data gathering
3426

    
3427
    HVPREFIX = "hv/"
3428
    BEPREFIX = "be/"
3429
    output = []
3430
    for instance in instance_list:
3431
      iout = []
3432
      i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3433
      i_be = self.cfg.GetClusterInfo().FillBE(instance)
3434
      for field in self.op.output_fields:
3435
        st_match = self._FIELDS_STATIC.Matches(field)
3436
        if field == "name":
3437
          val = instance.name
3438
        elif field == "os":
3439
          val = instance.os
3440
        elif field == "pnode":
3441
          val = instance.primary_node
3442
        elif field == "snodes":
3443
          val = list(instance.secondary_nodes)
3444
        elif field == "admin_state":
3445
          val = instance.admin_up
3446
        elif field == "oper_state":
3447
          if instance.primary_node in bad_nodes:
3448
            val = None
3449
          else:
3450
            val = bool(live_data.get(instance.name))
3451
        elif field == "status":
3452
          if instance.primary_node in off_nodes:
3453
            val = "ERROR_nodeoffline"
3454
          elif instance.primary_node in bad_nodes:
3455
            val = "ERROR_nodedown"
3456
          else:
3457
            running = bool(live_data.get(instance.name))
3458
            if running:
3459
              if instance.admin_up:
3460
                val = "running"
3461
              else:
3462
                val = "ERROR_up"
3463
            else:
3464
              if instance.admin_up:
3465
                val = "ERROR_down"
3466
              else:
3467
                val = "ADMIN_down"
3468
        elif field == "oper_ram":
3469
          if instance.primary_node in bad_nodes:
3470
            val = None
3471
          elif instance.name in live_data:
3472
            val = live_data[instance.name].get("memory", "?")
3473
          else:
3474
            val = "-"
3475
        elif field == "disk_template":
3476
          val = instance.disk_template
3477
        elif field == "ip":
3478
          val = instance.nics[0].ip
3479
        elif field == "bridge":
3480
          val = instance.nics[0].bridge
3481
        elif field == "mac":
3482
          val = instance.nics[0].mac
3483
        elif field == "sda_size" or field == "sdb_size":
3484
          idx = ord(field[2]) - ord('a')
3485
          try:
3486
            val = instance.FindDisk(idx).size
3487
          except errors.OpPrereqError:
3488
            val = None
3489
        elif field == "disk_usage": # total disk usage per node
3490
          disk_sizes = [{'size': disk.size} for disk in instance.disks]
3491
          val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3492
        elif field == "tags":
3493
          val = list(instance.GetTags())
3494
        elif field == "serial_no":
3495
          val = instance.serial_no
3496
        elif field == "network_port":
3497
          val = instance.network_port
3498
        elif field == "hypervisor":
3499
          val = instance.hypervisor
3500
        elif field == "hvparams":
3501
          val = i_hv
3502
        elif (field.startswith(HVPREFIX) and
3503
              field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3504
          val = i_hv.get(field[len(HVPREFIX):], None)
3505
        elif field == "beparams":
3506
          val = i_be
3507
        elif (field.startswith(BEPREFIX) and
3508
              field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3509
          val = i_be.get(field[len(BEPREFIX):], None)
3510
        elif st_match and st_match.groups():
3511
          # matches a variable list
3512
          st_groups = st_match.groups()
3513
          if st_groups and st_groups[0] == "disk":
3514
            if st_groups[1] == "count":
3515
              val = len(instance.disks)
3516
            elif st_groups[1] == "sizes":
3517
              val = [disk.size for disk in instance.disks]
3518
            elif st_groups[1] == "size":
3519
              try:
3520
                val = instance.FindDisk(st_groups[2]).size
3521
              except errors.OpPrereqError:
3522
                val = None
3523
            else:
3524
              assert False, "Unhandled disk parameter"
3525
          elif st_groups[0] == "nic":
3526
            if st_groups[1] == "count":
3527
              val = len(instance.nics)
3528
            elif st_groups[1] == "macs":
3529
              val = [nic.mac for nic in instance.nics]
3530
            elif st_groups[1] == "ips":
3531
              val = [nic.ip for nic in instance.nics]
3532
            elif st_groups[1] == "bridges":
3533
              val = [nic.bridge for nic in instance.nics]
3534
            else:
3535
              # index-based item
3536
              nic_idx = int(st_groups[2])
3537
              if nic_idx >= len(instance.nics):
3538
                val = None
3539
              else:
3540
                if st_groups[1] == "mac":
3541
                  val = instance.nics[nic_idx].mac
3542
                elif st_groups[1] == "ip":
3543
                  val = instance.nics[nic_idx].ip
3544
                elif st_groups[1] == "bridge":
3545
                  val = instance.nics[nic_idx].bridge
3546
                else:
3547
                  assert False, "Unhandled NIC parameter"
3548
          else:
3549
            assert False, "Unhandled variable parameter"
3550
        else:
3551
          raise errors.ParameterError(field)
3552
        iout.append(val)
3553
      output.append(iout)
3554

    
3555
    return output
3556

    
3557

    
3558
class LUFailoverInstance(LogicalUnit):
3559
  """Failover an instance.
3560

3561
  """
3562
  HPATH = "instance-failover"
3563
  HTYPE = constants.HTYPE_INSTANCE
3564
  _OP_REQP = ["instance_name", "ignore_consistency"]
3565
  REQ_BGL = False
3566

    
3567
  def ExpandNames(self):
3568
    self._ExpandAndLockInstance()
3569
    self.needed_locks[locking.LEVEL_NODE] = []
3570
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3571

    
3572
  def DeclareLocks(self, level):
3573
    if level == locking.LEVEL_NODE:
3574
      self._LockInstancesNodes()
3575

    
3576
  def BuildHooksEnv(self):
3577
    """Build hooks env.
3578

3579
    This runs on master, primary and secondary nodes of the instance.
3580

3581
    """
3582
    env = {
3583
      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3584
      }
3585
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3586
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3587
    return env, nl, nl
3588

    
3589
  def CheckPrereq(self):
3590
    """Check prerequisites.
3591

3592
    This checks that the instance is in the cluster.
3593

3594
    """
3595
    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3596
    assert self.instance is not None, \
3597
      "Cannot retrieve locked instance %s" % self.op.instance_name
3598

    
3599
    bep = self.cfg.GetClusterInfo().FillBE(instance)
3600
    if instance.disk_template not in constants.DTS_NET_MIRROR:
3601
      raise errors.OpPrereqError("Instance's disk layout is not"
3602
                                 " network mirrored, cannot failover.")
3603

    
3604
    secondary_nodes = instance.secondary_nodes
3605
    if not secondary_nodes:
3606
      raise errors.ProgrammerError("no secondary node but using "
3607
                                   "a mirrored disk template")
3608

    
3609
    target_node = secondary_nodes[0]
3610
    _CheckNodeOnline(self, target_node)
3611
    _CheckNodeNotDrained(self, target_node)
3612
    # check memory requirements on the secondary node
3613
    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3614
                         instance.name, bep[constants.BE_MEMORY],
3615
                         instance.hypervisor)
3616
    # check bridge existance
3617
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3618

    
3619
  def Exec(self, feedback_fn):
3620
    """Failover an instance.
3621

3622
    The failover is done by shutting it down on its present node and
3623
    starting it on the secondary.
3624

3625
    """
3626
    instance = self.instance
3627

    
3628
    source_node = instance.primary_node
3629
    target_node = instance.secondary_nodes[0]
3630

    
3631
    feedback_fn("* checking disk consistency between source and target")
3632
    for dev in instance.disks:
3633
      # for drbd, these are drbd over lvm
3634
      if not _CheckDiskConsistency(self, dev, target_node, False):
3635
        if instance.admin_up and not self.op.ignore_consistency:
3636
          raise errors.OpExecError("Disk %s is degraded on target node,"
3637
                                   " aborting failover." % dev.iv_name)
3638

    
3639
    feedback_fn("* shutting down instance on source node")
3640
    logging.info("Shutting down instance %s on node %s",
3641
                 instance.name, source_node)
3642

    
3643
    result = self.rpc.call_instance_shutdown(source_node, instance)
3644
    msg = result.RemoteFailMsg()
3645
    if msg:
3646
      if self.op.ignore_consistency:
3647
        self.proc.LogWarning("Could not shutdown instance %s on node %s."
3648
                             " Proceeding anyway. Please make sure node"
3649
                             " %s is down. Error details: %s",
3650
                             instance.name, source_node, source_node, msg)
3651
      else:
3652
        raise errors.OpExecError("Could not shutdown instance %s on"
3653
                                 " node %s: %s" %
3654
                                 (instance.name, source_node, msg))
3655

    
3656
    feedback_fn("* deactivating the instance's disks on source node")
3657
    if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3658
      raise errors.OpExecError("Can't shut down the instance's disks.")
3659

    
3660
    instance.primary_node = target_node
3661
    # distribute new instance config to the other nodes
3662
    self.cfg.Update(instance)
3663

    
3664
    # Only start the instance if it's marked as up
3665
    if instance.admin_up:
3666
      feedback_fn("* activating the instance's disks on target node")
3667
      logging.info("Starting instance %s on node %s",
3668
                   instance.name, target_node)
3669

    
3670
      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3671
                                               ignore_secondaries=True)
3672
      if not disks_ok:
3673
        _ShutdownInstanceDisks(self, instance)
3674
        raise errors.OpExecError("Can't activate the instance's disks")
3675

    
3676
      feedback_fn("* starting the instance on the target node")
3677
      result = self.rpc.call_instance_start(target_node, instance, None, None)
3678
      msg = result.RemoteFailMsg()
3679
      if msg:
3680
        _ShutdownInstanceDisks(self, instance)
3681
        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3682
                                 (instance.name, target_node, msg))
3683

    
3684

    
3685
class LUMigrateInstance(LogicalUnit):
3686
  """Migrate an instance.
3687

3688
  This is migration without shutting down, compared to the failover,
3689
  which is done with shutdown.
3690

3691
  """
3692
  HPATH = "instance-migrate"
3693
  HTYPE = constants.HTYPE_INSTANCE
3694
  _OP_REQP = ["instance_name", "live", "cleanup"]
3695

    
3696
  REQ_BGL = False
3697

    
3698
  def ExpandNames(self):
3699
    self._ExpandAndLockInstance()
3700
    self.needed_locks[locking.LEVEL_NODE] = []
3701
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3702

    
3703
  def DeclareLocks(self, level):
3704
    if level == locking.LEVEL_NODE:
3705
      self._LockInstancesNodes()
3706

    
3707
  def BuildHooksEnv(self):
3708
    """Build hooks env.
3709

3710
    This runs on master, primary and secondary nodes of the instance.
3711

3712
    """
3713
    env = _BuildInstanceHookEnvByObject(self, self.instance)
3714
    env["MIGRATE_LIVE"] = self.op.live
3715
    env["MIGRATE_CLEANUP"] = self.op.cleanup
3716
    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3717
    return env, nl, nl
3718

    
3719
  def CheckPrereq(self):
3720
    """Check prerequisites.
3721

3722
    This checks that the instance is in the cluster.
3723

3724
    """
3725
    instance = self.cfg.GetInstanceInfo(
3726
      self.cfg.ExpandInstanceName(self.op.instance_name))
3727
    if instance is None:
3728
      raise errors.OpPrereqError("Instance '%s' not known" %
3729
                                 self.op.instance_name)
3730

    
3731
    if instance.disk_template != constants.DT_DRBD8:
3732
      raise errors.OpPrereqError("Instance's disk layout is not"
3733
                                 " drbd8, cannot migrate.")
3734

    
3735
    secondary_nodes = instance.secondary_nodes
3736
    if not secondary_nodes:
3737
      raise errors.ConfigurationError("No secondary node but using"
3738
                                      " drbd8 disk template")
3739

    
3740
    i_be = self.cfg.GetClusterInfo().FillBE(instance)
3741

    
3742
    target_node = secondary_nodes[0]
3743
    # check memory requirements on the secondary node
3744
    _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3745
                         instance.name, i_be[constants.BE_MEMORY],
3746
                         instance.hypervisor)
3747

    
3748
    # check bridge existance
3749
    _CheckInstanceBridgesExist(self, instance, node=target_node)
3750

    
3751
    if not self.op.cleanup:
3752
      _CheckNodeNotDrained(self, target_node)
3753
      result = self.rpc.call_instance_migratable(instance.primary_node,
3754
                                                 instance)
3755
      msg = result.RemoteFailMsg()
3756
      if msg:
3757
        raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3758
                                   msg)
3759

    
3760
    self.instance = instance
3761

    
3762
  def _WaitUntilSync(self):
3763
    """Poll with custom rpc for disk sync.
3764

3765
    This uses our own step-based rpc call.
3766

3767
    """
3768
    self.feedback_fn("* wait until resync is done")
3769
    all_done = False
3770
    while not all_done:
3771
      all_done = True
3772
      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3773
                                            self.nodes_ip,
3774
                                            self.instance.disks)
3775
      min_percent = 100
3776
      for node, nres in result.items():
3777
        msg = nres.RemoteFailMsg()
3778
        if msg:
3779
          raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3780
                                   (node, msg))
3781
        node_done, node_percent = nres.payload
3782
        all_done = all_done and node_done
3783
        if node_percent is not None:
3784
          min_percent = min(min_percent, node_percent)
3785
      if not all_done:
3786
        if min_percent < 100:
3787
          self.feedback_fn("   - progress: %.1f%%" % min_percent)
3788
        time.sleep(2)
3789

    
3790
  def _EnsureSecondary(self, node):
3791
    """Demote a node to secondary.
3792

3793
    """
3794
    self.feedback_fn("* switching node %s to secondary mode" % node)
3795

    
3796
    for dev in self.instance.disks:
3797
      self.cfg.SetDiskID(dev, node)
3798

    
3799
    result = self.rpc.call_blockdev_close(node, self.instance.name,
3800
                                          self.instance.disks)
3801
    msg = result.RemoteFailMsg()
3802
    if msg:
3803
      raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3804
                               " error %s" % (node, msg))
3805

    
3806
  def _GoStandalone(self):
3807
    """Disconnect from the network.
3808

3809
    """
3810
    self.feedback_fn("* changing into standalone mode")
3811
    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3812
                                               self.instance.disks)
3813
    for node, nres in result.items():
3814
      msg = nres.RemoteFailMsg()
3815
      if msg:
3816
        raise errors.OpExecError("Cannot disconnect disks node %s,"
3817
                                 " error %s" % (node, msg))
3818

    
3819
  def _GoReconnect(self, multimaster):
3820
    """Reconnect to the network.
3821

3822
    """
3823
    if multimaster:
3824
      msg = "dual-master"
3825
    else:
3826
      msg = "single-master"
3827
    self.feedback_fn("* changing disks into %s mode" % msg)
3828
    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3829
                                           self.instance.disks,
3830
                                           self.instance.name, multimaster)
3831
    for node, nres in result.items():
3832
      msg = nres.RemoteFailMsg()
3833
      if msg:
3834
        raise errors.OpExecError("Cannot change disks config on node %s,"
3835
                                 " error: %s" % (node, msg))
3836

    
3837
  def _ExecCleanup(self):
3838
    """Try to cleanup after a failed migration.
3839

3840
    The cleanup is done by:
3841
      - check that the instance is running only on one node
3842
        (and update the config if needed)
3843
      - change disks on its secondary node to secondary
3844
      - wait until disks are fully synchronized
3845
      - disconnect from the network
3846
      - change disks into single-master mode
3847
      - wait again until disks are fully synchronized
3848

3849
    """
3850
    instance = self.instance
3851
    target_node = self.target_node
3852
    source_node = self.source_node
3853

    
3854
    # check running on only one node
3855
    self.feedback_fn("* checking where the instance actually runs"
3856
                     " (if this hangs, the hypervisor might be in"
3857
                     " a bad state)")
3858
    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3859
    for node, result in ins_l.items():
3860
      result.Raise()
3861
      if not isinstance(result.data, list):
3862
        raise errors.OpExecError("Can't contact node '%s'" % node)
3863

    
3864
    runningon_source = instance.name in ins_l[source_node].data
3865
    runningon_target = instance.name in ins_l[target_node].data
3866

    
3867
    if runningon_source and runningon_target:
3868
      raise errors.OpExecError("Instance seems to be running on two nodes,"
3869
                               " or the hypervisor is confused. You will have"
3870
                               " to ensure manually that it runs only on one"
3871
                               " and restart this operation.")
3872

    
3873
    if not (runningon_source or runningon_target):
3874
      raise errors.OpExecError("Instance does not seem to be running at all."
3875
                               " In this case, it's safer to repair by"
3876
                               " running 'gnt-instance stop' to ensure disk"
3877
                               " shutdown, and then restarting it.")
3878

    
3879
    if runningon_target:
3880
      # the migration has actually succeeded, we need to update the config
3881
      self.feedback_fn("* instance running on secondary node (%s),"
3882
                       " updating config" % target_node)
3883
      instance.primary_node = target_node
3884
      self.cfg.Update(instance)
3885
      demoted_node = source_node
3886
    else:
3887
      self.feedback_fn("* instance confirmed to be running on its"
3888
                       " primary node (%s)" % source_node)
3889
      demoted_node = target_node
3890

    
3891
    self._EnsureSecondary(demoted_node)
3892
    try:
3893
      self._WaitUntilSync()
3894
    except errors.OpExecError:
3895
      # we ignore here errors, since if the device is standalone, it
3896
      # won't be able to sync
3897
      pass
3898
    self._GoStandalone()
3899
    self._GoReconnect(False)
3900
    self._WaitUntilSync()
3901

    
3902
    self.feedback_fn("* done")
3903

    
3904
  def _RevertDiskStatus(self):
3905
    """Try to revert the disk status after a failed migration.
3906

3907
    """
3908
    target_node = self.target_node
3909
    try:
3910
      self._EnsureSecondary(target_node)
3911
      self._GoStandalone()
3912
      self._GoReconnect(False)
3913
      self._WaitUntilSync()
3914
    except errors.OpExecError, err:
3915
      self.LogWarning("Migration failed and I can't reconnect the"
3916
                      " drives: error '%s'\n"
3917
                      "Please look and recover the instance status" %
3918
                      str(err))
3919

    
3920
  def _AbortMigration(self):
3921
    """Call the hypervisor code to abort a started migration.
3922

3923
    """
3924
    instance = self.instance
3925
    target_node = self.target_node
3926
    migration_info = self.migration_info
3927

    
3928
    abort_result = self.rpc.call_finalize_migration(target_node,
3929
                                                    instance,
3930
                                                    migration_info,
3931
                                                    False)
3932
    abort_msg = abort_result.RemoteFailMsg()
3933
    if abort_msg:
3934
      logging.error("Aborting migration failed on target node %s: %s" %
3935
                    (target_node, abort_msg))
3936
      # Don't raise an exception here, as we stil have to try to revert the
3937
      # disk status, even if this step failed.
3938

    
3939
  def _ExecMigration(self):
3940
    """Migrate an instance.
3941

3942
    The migrate is done by:
3943
      - change the disks into dual-master mode
3944
      - wait until disks are fully synchronized again
3945
      - migrate the instance
3946
      - change disks on the new secondary node (the old primary) to secondary
3947
      - wait until disks are fully synchronized
3948
      - change disks into single-master mode
3949

3950
    """
3951
    instance = self.instance
3952
    target_node = self.target_node
3953
    source_node = self.source_node
3954

    
3955
    self.feedback_fn("* checking disk consistency between source and target")
3956
    for dev in instance.disks:
3957
      if not _CheckDiskConsistency(self, dev, target_node, False):
3958
        raise errors.OpExecError("Disk %s is degraded or not fully"
3959
                                 " synchronized on target node,"
3960
                                 " aborting migrate." % dev.iv_name)
3961

    
3962
    # First get the migration information from the remote node
3963
    result = self.rpc.call_migration_info(source_node, instance)
3964
    msg = result.RemoteFailMsg()
3965
    if msg:
3966
      log_err = ("Failed fetching source migration information from %s: %s" %
3967
                 (source_node, msg))
3968
      logging.error(log_err)
3969
      raise errors.OpExecError(log_err)
3970

    
3971
    self.migration_info = migration_info = result.payload
3972

    
3973
    # Then switch the disks to master/master mode
3974
    self._EnsureSecondary(target_node)
3975
    self._GoStandalone()
3976
    self._GoReconnect(True)
3977
    self._WaitUntilSync()
3978

    
3979
    self.feedback_fn("* preparing %s to accept the instance" % target_node)
3980
    result = self.rpc.call_accept_instance(target_node,
3981
                                           instance,
3982
                                           migration_info,
3983
                                           self.nodes_ip[target_node])
3984

    
3985
    msg = result.RemoteFailMsg()
3986
    if msg:
3987
      logging.error("Instance pre-migration failed, trying to revert"
3988
                    " disk status: %s", msg)
3989
      self._AbortMigration()
3990
      self._RevertDiskStatus()
3991
      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3992
                               (instance.name, msg))
3993

    
3994
    self.feedback_fn("* migrating instance to %s" % target_node)
3995
    time.sleep(10)
3996
    result = self.rpc.call_instance_migrate(source_node, instance,
3997
                                            self.nodes_ip[target_node],
3998
                                            self.op.live)
3999
    msg = result.RemoteFailMsg()
4000
    if msg:
4001
      logging.error("Instance migration failed, trying to revert"
4002
                    " disk status: %s", msg)
4003
      self._AbortMigration()
4004
      self._RevertDiskStatus()
4005
      raise errors.OpExecError("Could not migrate instance %s: %s" %
4006
                               (instance.name, msg))
4007
    time.sleep(10)
4008

    
4009
    instance.primary_node = target_node
4010
    # distribute new instance config to the other nodes
4011
    self.cfg.Update(instance)
4012

    
4013
    result = self.rpc.call_finalize_migration(target_node,
4014
                                              instance,
4015
                                              migration_info,
4016
                                              True)
4017
    msg = result.RemoteFailMsg()
4018
    if msg:
4019
      logging.error("Instance migration succeeded, but finalization failed:"
4020
                    " %s" % msg)
4021
      raise errors.OpExecError("Could not finalize instance migration: %s" %
4022
                               msg)
4023

    
4024
    self._EnsureSecondary(source_node)
4025
    self._WaitUntilSync()
4026
    self._GoStandalone()
4027
    self._GoReconnect(False)
4028
    self._WaitUntilSync()
4029

    
4030
    self.feedback_fn("* done")
4031

    
4032
  def Exec(self, feedback_fn):
4033
    """Perform the migration.
4034

4035
    """
4036
    self.feedback_fn = feedback_fn
4037

    
4038
    self.source_node = self.instance.primary_node
4039
    self.target_node = self.instance.secondary_nodes[0]
4040
    self.all_nodes = [self.source_node, self.target_node]
4041
    self.nodes_ip = {
4042
      self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4043
      self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4044
      }
4045
    if self.op.cleanup:
4046
      return self._ExecCleanup()
4047
    else:
4048
      return self._ExecMigration()
4049

    
4050

    
4051
def _CreateBlockDev(lu, node, instance, device, force_create,
4052
                    info, force_open):
4053
  """Create a tree of block devices on a given node.
4054

4055
  If this device type has to be created on secondaries, create it and
4056
  all its children.
4057

4058
  If not, just recurse to children keeping the same 'force' value.
4059

4060
  @param lu: the lu on whose behalf we execute
4061
  @param node: the node on which to create the device
4062
  @type instance: L{objects.Instance}
4063
  @param instance: the instance which owns the device
4064
  @type device: L{objects.Disk}
4065
  @param device: the device to create
4066
  @type force_create: boolean
4067
  @param force_create: whether to force creation of this device; this
4068
      will be change to True whenever we find a device which has
4069
      CreateOnSecondary() attribute
4070
  @param info: the extra 'metadata' we should attach to the device
4071
      (this will be represented as a LVM tag)
4072
  @type force_open: boolean
4073
  @param force_open: this parameter will be passes to the
4074
      L{backend.BlockdevCreate} function where it specifies
4075
      whether we run on primary or not, and it affects both
4076
      the child assembly and the device own Open() execution
4077

4078
  """
4079
  if device.CreateOnSecondary():
4080
    force_create = True
4081

    
4082
  if device.children:
4083
    for child in device.children:
4084
      _CreateBlockDev(lu, node, instance, child, force_create,
4085
                      info, force_open)
4086

    
4087
  if not force_create:
4088
    return
4089

    
4090
  _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4091

    
4092

    
4093
def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4094
  """Create a single block device on a given node.
4095

4096
  This will not recurse over children of the device, so they must be
4097
  created in advance.
4098

4099
  @param lu: the lu on whose behalf we execute
4100
  @param node: the node on which to create the device
4101
  @type instance: L{objects.Instance}
4102
  @param instance: the instance which owns the device
4103
  @type device: L{objects.Disk}
4104
  @param device: the device to create
4105
  @param info: the extra 'metadata' we should attach to the device
4106
      (this will be represented as a LVM tag)
4107
  @type force_open: boolean
4108
  @param force_open: this parameter will be passes to the
4109
      L{backend.BlockdevCreate} function where it specifies
4110
      whether we run on primary or not, and it affects both
4111
      the child assembly and the device own Open() execution
4112

4113
  """
4114
  lu.cfg.SetDiskID(device, node)
4115
  result = lu.rpc.call_blockdev_create(node, device, device.size,
4116
                                       instance.name, force_open, info)
4117
  msg = result.RemoteFailMsg()
4118
  if msg:
4119
    raise errors.OpExecError("Can't create block device %s on"
4120
                             " node %s for instance %s: %s" %
4121
                             (device, node, instance.name, msg))
4122
  if device.physical_id is None:
4123
    device.physical_id = result.payload
4124

    
4125

    
4126
def _GenerateUniqueNames(lu, exts):
4127
  """Generate a suitable LV name.
4128

4129
  This will generate a logical volume name for the given instance.
4130

4131
  """
4132
  results = []
4133
  for val in exts:
4134
    new_id = lu.cfg.GenerateUniqueID()
4135
    results.append("%s%s" % (new_id, val))
4136
  return results
4137

    
4138

    
4139
def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4140
                         p_minor, s_minor):
4141
  """Generate a drbd8 device complete with its children.
4142

4143
  """
4144
  port = lu.cfg.AllocatePort()
4145
  vgname = lu.cfg.GetVGName()
4146
  shared_secret = lu.cfg.GenerateDRBDSecret()
4147
  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4148
                          logical_id=(vgname, names[0]))
4149
  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4150
                          logical_id=(vgname, names[1]))
4151
  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4152
                          logical_id=(primary, secondary, port,
4153
                                      p_minor, s_minor,
4154
                                      shared_secret),
4155
                          children=[dev_data, dev_meta],
4156
                          iv_name=iv_name)
4157
  return drbd_dev
4158

    
4159

    
4160
def _GenerateDiskTemplate(lu, template_name,
4161
                          instance_name, primary_node,
4162
                          secondary_nodes, disk_info,
4163
                          file_storage_dir, file_driver,
4164
                          base_index):
4165
  """Generate the entire disk layout for a given template type.
4166

4167
  """
4168
  #TODO: compute space requirements
4169

    
4170
  vgname = lu.cfg.GetVGName()
4171
  disk_count = len(disk_info)
4172
  disks = []
4173
  if template_name == constants.DT_DISKLESS:
4174
    pass
4175
  elif template_name == constants.DT_PLAIN:
4176
    if len(secondary_nodes) != 0:
4177
      raise errors.ProgrammerError("Wrong template configuration")
4178

    
4179
    names = _GenerateUniqueNames(lu, [".disk%d" % i
4180
                                      for i in range(disk_count)])
4181
    for idx, disk in enumerate(disk_info):
4182
      disk_index = idx + base_index
4183
      disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4184
                              logical_id=(vgname, names[idx]),
4185
                              iv_name="disk/%d" % disk_index,
4186
                              mode=disk["mode"])
4187
      disks.append(disk_dev)
4188
  elif template_name == constants.DT_DRBD8:
4189
    if len(secondary_nodes) != 1:
4190
      raise errors.ProgrammerError("Wrong template configuration")
4191
    remote_node = secondary_nodes[0]
4192
    minors = lu.cfg.AllocateDRBDMinor(
4193
      [primary_node, remote_node] * len(disk_info), instance_name)
4194

    
4195
    names = []
4196
    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4197
                                               for i in range(disk_count)]):
4198
      names.append(lv_prefix + "_data")
4199
      names.append(lv_prefix + "_meta")
4200
    for idx, disk in enumerate(disk_info):
4201
      disk_index = idx + base_index
4202
      disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4203
                                      disk["size"], names[idx*2:idx*2+2],
4204
                                      "disk/%d" % disk_index,
4205
                                      minors[idx*2], minors[idx*2+1])
4206
      disk_dev.mode = disk["mode"]
4207
      disks.append(disk_dev)
4208
  elif template_name == constants.DT_FILE:
4209
    if len(secondary_nodes) != 0:
4210
      raise errors.ProgrammerError("Wrong template configuration")
4211

    
4212
    for idx, disk in enumerate(disk_info):
4213
      disk_index = idx + base_index
4214
      disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4215
                              iv_name="disk/%d" % disk_index,
4216
                              logical_id=(file_driver,
4217
                                          "%s/disk%d" % (file_storage_dir,
4218
                                                         disk_index)),
4219
                              mode=disk["mode"])
4220
      disks.append(disk_dev)
4221
  else:
4222
    raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4223
  return disks
4224

    
4225

    
4226
def _GetInstanceInfoText(instance):
4227
  """Compute that text that should be added to the disk's metadata.
4228

4229
  """
4230
  return "originstname+%s" % instance.name
4231

    
4232

    
4233
def _CreateDisks(lu, instance):
4234
  """Create all disks for an instance.
4235

4236
  This abstracts away some work from AddInstance.
4237

4238
  @type lu: L{LogicalUnit}
4239
  @param lu: the logical unit on whose behalf we execute
4240
  @type instance: L{objects.Instance}
4241
  @param instance: the instance whose disks we should create
4242
  @rtype: boolean
4243
  @return: the success of the creation
4244

4245
  """
4246
  info = _GetInstanceInfoText(instance)
4247
  pnode = instance.primary_node
4248

    
4249
  if instance.disk_template == constants.DT_FILE:
4250
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4251
    result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4252

    
4253
    if result.failed or not result.data:
4254
      raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4255

    
4256
    if not result.data[0]:
4257
      raise errors.OpExecError("Failed to create directory '%s'" %
4258
                               file_storage_dir)
4259

    
4260
  # Note: this needs to be kept in sync with adding of disks in
4261
  # LUSetInstanceParams
4262
  for device in instance.disks:
4263
    logging.info("Creating volume %s for instance %s",
4264
                 device.iv_name, instance.name)
4265
    #HARDCODE
4266
    for node in instance.all_nodes:
4267
      f_create = node == pnode
4268
      _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4269

    
4270

    
4271
def _RemoveDisks(lu, instance):
4272
  """Remove all disks for an instance.
4273

4274
  This abstracts away some work from `AddInstance()` and
4275
  `RemoveInstance()`. Note that in case some of the devices couldn't
4276
  be removed, the removal will continue with the other ones (compare
4277
  with `_CreateDisks()`).
4278

4279
  @type lu: L{LogicalUnit}
4280
  @param lu: the logical unit on whose behalf we execute
4281
  @type instance: L{objects.Instance}
4282
  @param instance: the instance whose disks we should remove
4283
  @rtype: boolean
4284
  @return: the success of the removal
4285

4286
  """
4287
  logging.info("Removing block devices for instance %s", instance.name)
4288

    
4289
  all_result = True
4290
  for device in instance.disks:
4291
    for node, disk in device.ComputeNodeTree(instance.primary_node):
4292
      lu.cfg.SetDiskID(disk, node)
4293
      msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4294
      if msg:
4295
        lu.LogWarning("Could not remove block device %s on node %s,"
4296
                      " continuing anyway: %s", device.iv_name, node, msg)
4297
        all_result = False
4298

    
4299
  if instance.disk_template == constants.DT_FILE:
4300
    file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4301
    result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4302
                                                 file_storage_dir)
4303
    if result.failed or not result.data:
4304
      logging.error("Could not remove directory '%s'", file_storage_dir)
4305
      all_result = False
4306

    
4307
  return all_result
4308

    
4309

    
4310
def _ComputeDiskSize(disk_template, disks):
4311
  """Compute disk size requirements in the volume group
4312

4313
  """
4314
  # Required free disk space as a function of disk and swap space
4315
  req_size_dict = {
4316
    constants.DT_DISKLESS: None,
4317
    constants.DT_PLAIN: sum(d["size"] for d in disks),
4318
    # 128 MB are added for drbd metadata for each disk
4319
    constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4320
    constants.DT_FILE: None,
4321
  }
4322

    
4323
  if disk_template not in req_size_dict:
4324
    raise errors.ProgrammerError("Disk template '%s' size requirement"
4325
                                 " is unknown" %  disk_template)
4326

    
4327
  return req_size_dict[disk_template]
4328

    
4329

    
4330
def _CheckHVParams(lu, nodenames, hvname, hvparams):
4331
  """Hypervisor parameter validation.
4332

4333
  This function abstract the hypervisor parameter validation to be
4334
  used in both instance create and instance modify.
4335

4336
  @type lu: L{LogicalUnit}
4337
  @param lu: the logical unit for which we check
4338
  @type nodenames: list
4339
  @param nodenames: the list of nodes on which we should check
4340
  @type hvname: string
4341
  @param hvname: the name of the hypervisor we should use
4342
  @type hvparams: dict
4343
  @param hvparams: the parameters which we need to check
4344
  @raise errors.OpPrereqError: if the parameters are not valid
4345

4346
  """
4347
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4348
                                                  hvname,
4349
                                                  hvparams)
4350
  for node in nodenames:
4351
    info = hvinfo[node]
4352
    if info.offline:
4353
      continue
4354
    msg = info.RemoteFailMsg()
4355
    if msg:
4356
      raise errors.OpPrereqError("Hypervisor parameter validation"
4357
                                 " failed on node %s: %s" % (node, msg))
4358

    
4359

    
4360
class LUCreateInstance(LogicalUnit):
4361
  """Create an instance.
4362

4363
  """
4364
  HPATH = "instance-add"
4365
  HTYPE = constants.HTYPE_INSTANCE
4366
  _OP_REQP = ["instance_name", "disks", "disk_template",
4367
              "mode", "start",
4368
              "wait_for_sync", "ip_check", "nics",
4369
              "hvparams", "beparams"]
4370
  REQ_BGL = False
4371

    
4372
  def _ExpandNode(self, node):
4373
    """Expands and checks one node name.
4374

4375
    """
4376
    node_full = self.cfg.ExpandNodeName(node)
4377
    if node_full is None:
4378
      raise errors.OpPrereqError("Unknown node %s" % node)
4379
    return node_full
4380

    
4381
  def ExpandNames(self):
4382
    """ExpandNames for CreateInstance.
4383

4384
    Figure out the right locks for instance creation.
4385

4386
    """
4387
    self.needed_locks = {}
4388

    
4389
    # set optional parameters to none if they don't exist
4390
    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4391
      if not hasattr(self.op, attr):
4392
        setattr(self.op, attr, None)
4393

    
4394
    # cheap checks, mostly valid constants given
4395

    
4396
    # verify creation mode
4397
    if self.op.mode not in (constants.INSTANCE_CREATE,
4398
                            constants.INSTANCE_IMPORT):
4399
      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4400
                                 self.op.mode)
4401

    
4402
    # disk template and mirror node verification
4403
    if self.op.disk_template not in constants.DISK_TEMPLATES:
4404
      raise errors.OpPrereqError("Invalid disk template name")
4405

    
4406
    if self.op.hypervisor is None:
4407
      self.op.hypervisor = self.cfg.GetHypervisorType()
4408

    
4409
    cluster = self.cfg.GetClusterInfo()
4410
    enabled_hvs = cluster.enabled_hypervisors
4411
    if self.op.hypervisor not in enabled_hvs:
4412
      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4413
                                 " cluster (%s)" % (self.op.hypervisor,
4414
                                  ",".join(enabled_hvs)))
4415

    
4416
    # check hypervisor parameter syntax (locally)
4417
    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4418
    filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4419
                                  self.op.hvparams)
4420
    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4421
    hv_type.CheckParameterSyntax(filled_hvp)
4422

    
4423
    # fill and remember the beparams dict
4424
    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4425
    self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4426
                                    self.op.beparams)
4427

    
4428
    #### instance parameters check
4429

    
4430
    # instance name verification
4431
    hostname1 = utils.HostInfo(self.op.instance_name)
4432
    self.op.instance_name = instance_name = hostname1.name
4433

    
4434
    # this is just a preventive check, but someone might still add this
4435
    # instance in the meantime, and creation will fail at lock-add time
4436
    if instance_name in self.cfg.GetInstanceList():
4437
      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4438
                                 instance_name)
4439

    
4440
    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4441

    
4442
    # NIC buildup
4443
    self.nics = []
4444
    for idx, nic in enumerate(self.op.nics):
4445
      nic_mode_req = nic.get("mode", None)
4446
      nic_mode = nic_mode_req
4447
      if nic_mode is None:
4448
        nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4449

    
4450
      # in routed mode, for the first nic, the default ip is 'auto'
4451
      if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4452
        default_ip_mode = constants.VALUE_AUTO
4453
      else:
4454
        default_ip_mode = constants.VALUE_NONE
4455

    
4456
      # ip validity checks
4457
      ip = nic.get("ip", default_ip_mode)
4458
      if ip is None or ip.lower() == constants.VALUE_NONE:
4459
        nic_ip = None
4460
      elif ip.lower() == constants.VALUE_AUTO:
4461
        nic_ip = hostname1.ip
4462
      else:
4463
        if not utils.IsValidIP(ip):
4464
          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4465
                                     " like a valid IP" % ip)
4466
        nic_ip = ip
4467

    
4468
      # TODO: check the ip for uniqueness !!
4469
      if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4470
        raise errors.OpPrereqError("Routed nic mode requires an ip address")
4471

    
4472
      # MAC address verification
4473
      mac = nic.get("mac", constants.VALUE_AUTO)
4474
      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4475
        if not utils.IsValidMac(mac.lower()):
4476
          raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4477
                                     mac)
4478
      # bridge verification
4479
      bridge = nic.get("bridge", None)
4480
      link = nic.get("link", None)
4481
      if bridge and link:
4482
        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4483
      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4484
        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4485
      elif bridge:
4486
        link = bridge
4487

    
4488
      nicparams = {}
4489
      if nic_mode_req:
4490
        nicparams[constants.NIC_MODE] = nic_mode_req
4491
      if link:
4492
        nicparams[constants.NIC_LINK] = link
4493

    
4494
      check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4495
                                      nicparams)
4496
      objects.NIC.CheckParameterSyntax(check_params)
4497
      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4498

    
4499
    # disk checks/pre-build
4500
    self.disks = []
4501
    for disk in self.op.disks:
4502
      mode = disk.get("mode", constants.DISK_RDWR)
4503
      if mode not in constants.DISK_ACCESS_SET:
4504
        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4505
                                   mode)
4506
      size = disk.get("size", None)
4507
      if size is None:
4508
        raise errors.OpPrereqError("Missing disk size")
4509
      try:
4510
        size = int(size)
4511
      except ValueError:
4512
        raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4513
      self.disks.append({"size": size, "mode": mode})
4514

    
4515
    # used in CheckPrereq for ip ping check
4516
    self.check_ip = hostname1.ip
4517

    
4518
    # file storage checks
4519
    if (self.op.file_driver and
4520
        not self.op.file_driver in constants.FILE_DRIVER):
4521
      raise errors.OpPrereqError("Invalid file driver name '%s'" %
4522
                                 self.op.file_driver)
4523

    
4524
    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4525
      raise errors.OpPrereqError("File storage directory path not absolute")
4526

    
4527
    ### Node/iallocator related checks
4528
    if [self.op.iallocator, self.op.pnode].count(None) != 1:
4529
      raise errors.OpPrereqError("One and only one of iallocator and primary"
4530
                                 " node must be given")
4531

    
4532
    if self.op.iallocator:
4533
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4534
    else:
4535
      self.op.pnode = self._ExpandNode(self.op.pnode)
4536
      nodelist = [self.op.pnode]
4537
      if self.op.snode is not None:
4538
        self.op.snode = self._ExpandNode(self.op.snode)
4539
        nodelist.append(self.op.snode)
4540
      self.needed_locks[locking.LEVEL_NODE] = nodelist
4541

    
4542
    # in case of import lock the source node too
4543
    if self.op.mode == constants.INSTANCE_IMPORT:
4544
      src_node = getattr(self.op, "src_node", None)
4545
      src_path = getattr(self.op, "src_path", None)
4546

    
4547
      if src_path is None:
4548
        self.op.src_path = src_path = self.op.instance_name
4549

    
4550
      if src_node is None:
4551
        self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4552
        self.op.src_node = None
4553
        if os.path.isabs(src_path):
4554
          raise errors.OpPrereqError("Importing an instance from an absolute"
4555
                                     " path requires a source node option.")
4556
      else:
4557
        self.op.src_node = src_node = self._ExpandNode(src_node)
4558
        if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4559
          self.needed_locks[locking.LEVEL_NODE].append(src_node)
4560
        if not os.path.isabs(src_path):
4561
          self.op.src_path = src_path = \
4562
            os.path.join(constants.EXPORT_DIR, src_path)
4563

    
4564
    else: # INSTANCE_CREATE
4565
      if getattr(self.op, "os_type", None) is None:
4566
        raise errors.OpPrereqError("No guest OS specified")
4567

    
4568
  def _RunAllocator(self):
4569
    """Run the allocator based on input opcode.
4570

4571
    """
4572
    nics = [n.ToDict() for n in self.nics]
4573
    ial = IAllocator(self,
4574
                     mode=constants.IALLOCATOR_MODE_ALLOC,
4575
                     name=self.op.instance_name,
4576
                     disk_template=self.op.disk_template,
4577
                     tags=[],
4578
                     os=self.op.os_type,
4579
                     vcpus=self.be_full[constants.BE_VCPUS],
4580
                     mem_size=self.be_full[constants.BE_MEMORY],
4581
                     disks=self.disks,
4582
                     nics=nics,
4583
                     hypervisor=self.op.hypervisor,
4584
                     )
4585

    
4586
    ial.Run(self.op.iallocator)
4587

    
4588
    if not ial.success:
4589
      raise errors.OpPrereqError("Can't compute nodes using"
4590
                                 " iallocator '%s': %s" % (self.op.iallocator,
4591
                                                           ial.info))
4592
    if len(ial.nodes) != ial.required_nodes:
4593
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4594
                                 " of nodes (%s), required %s" %
4595
                                 (self.op.iallocator, len(ial.nodes),
4596
                                  ial.required_nodes))
4597
    self.op.pnode = ial.nodes[0]
4598
    self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4599
                 self.op.instance_name, self.op.iallocator,
4600
                 ", ".join(ial.nodes))
4601
    if ial.required_nodes == 2:
4602
      self.op.snode = ial.nodes[1]
4603

    
4604
  def BuildHooksEnv(self):
4605
    """Build hooks env.
4606

4607
    This runs on master, primary and secondary nodes of the instance.
4608

4609
    """
4610
    env = {
4611
      "ADD_MODE": self.op.mode,
4612
      }
4613
    if self.op.mode == constants.INSTANCE_IMPORT:
4614
      env["SRC_NODE"] = self.op.src_node
4615
      env["SRC_PATH"] = self.op.src_path
4616
      env["SRC_IMAGES"] = self.src_images
4617

    
4618
    env.update(_BuildInstanceHookEnv(
4619
      name=self.op.instance_name,
4620
      primary_node=self.op.pnode,
4621
      secondary_nodes=self.secondaries,
4622
      status=self.op.start,
4623
      os_type=self.op.os_type,
4624
      memory=self.be_full[constants.BE_MEMORY],
4625
      vcpus=self.be_full[constants.BE_VCPUS],
4626
      nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4627
      disk_template=self.op.disk_template,
4628
      disks=[(d["size"], d["mode"]) for d in self.disks],
4629
    ))
4630

    
4631
    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4632
          self.secondaries)
4633
    return env, nl, nl
4634

    
4635

    
4636
  def CheckPrereq(self):
4637
    """Check prerequisites.
4638

4639
    """
4640
    if (not self.cfg.GetVGName() and
4641
        self.op.disk_template not in constants.DTS_NOT_LVM):
4642
      raise errors.OpPrereqError("Cluster does not support lvm-based"
4643
                                 " instances")
4644

    
4645
    if self.op.mode == constants.INSTANCE_IMPORT:
4646
      src_node = self.op.src_node
4647
      src_path = self.op.src_path
4648

    
4649
      if src_node is None:
4650
        exp_list = self.rpc.call_export_list(
4651
          self.acquired_locks[locking.LEVEL_NODE])
4652
        found = False
4653
        for node in exp_list:
4654
          if not exp_list[node].failed and src_path in exp_list[node].data:
4655
            found = True
4656
            self.op.src_node = src_node = node
4657
            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4658
                                                       src_path)
4659
            break
4660
        if not found:
4661
          raise errors.OpPrereqError("No export found for relative path %s" %
4662
                                      src_path)
4663

    
4664
      _CheckNodeOnline(self, src_node)
4665
      result = self.rpc.call_export_info(src_node, src_path)
4666
      result.Raise()
4667
      if not result.data:
4668
        raise errors.OpPrereqError("No export found in dir %s" % src_path)
4669

    
4670
      export_info = result.data
4671
      if not export_info.has_section(constants.INISECT_EXP):
4672
        raise errors.ProgrammerError("Corrupted export config")
4673

    
4674
      ei_version = export_info.get(constants.INISECT_EXP, 'version')
4675
      if (int(ei_version) != constants.EXPORT_VERSION):
4676
        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4677
                                   (ei_version, constants.EXPORT_VERSION))
4678

    
4679
      # Check that the new instance doesn't have less disks than the export
4680
      instance_disks = len(self.disks)
4681
      export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4682
      if instance_disks < export_disks:
4683
        raise errors.OpPrereqError("Not enough disks to import."
4684
                                   " (instance: %d, export: %d)" %
4685
                                   (instance_disks, export_disks))
4686

    
4687
      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4688
      disk_images = []
4689
      for idx in range(export_disks):
4690
        option = 'disk%d_dump' % idx
4691
        if export_info.has_option(constants.INISECT_INS, option):
4692
          # FIXME: are the old os-es, disk sizes, etc. useful?
4693
          export_name = export_info.get(constants.INISECT_INS, option)
4694
          image = os.path.join(src_path, export_name)
4695
          disk_images.append(image)
4696
        else:
4697
          disk_images.append(False)
4698

    
4699
      self.src_images = disk_images
4700

    
4701
      old_name = export_info.get(constants.INISECT_INS, 'name')
4702
      # FIXME: int() here could throw a ValueError on broken exports
4703
      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4704
      if self.op.instance_name == old_name:
4705
        for idx, nic in enumerate(self.nics):
4706
          if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4707
            nic_mac_ini = 'nic%d_mac' % idx
4708
            nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4709

    
4710
    # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4711
    # ip ping checks (we use the same ip that was resolved in ExpandNames)
4712
    if self.op.start and not self.op.ip_check:
4713
      raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4714
                                 " adding an instance in start mode")
4715

    
4716
    if self.op.ip_check:
4717
      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4718
        raise errors.OpPrereqError("IP %s of instance %s already in use" %
4719
                                   (self.check_ip, self.op.instance_name))
4720

    
4721
    #### mac address generation
4722
    # By generating here the mac address both the allocator and the hooks get
4723
    # the real final mac address rather than the 'auto' or 'generate' value.
4724
    # There is a race condition between the generation and the instance object
4725
    # creation, which means that we know the mac is valid now, but we're not
4726
    # sure it will be when we actually add the instance. If things go bad
4727
    # adding the instance will abort because of a duplicate mac, and the
4728
    # creation job will fail.
4729
    for nic in self.nics:
4730
      if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4731
        nic.mac = self.cfg.GenerateMAC()
4732

    
4733
    #### allocator run
4734

    
4735
    if self.op.iallocator is not None:
4736
      self._RunAllocator()
4737

    
4738
    #### node related checks
4739

    
4740
    # check primary node
4741
    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4742
    assert self.pnode is not None, \
4743
      "Cannot retrieve locked node %s" % self.op.pnode
4744
    if pnode.offline:
4745
      raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4746
                                 pnode.name)
4747
    if pnode.drained:
4748
      raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4749
                                 pnode.name)
4750

    
4751
    self.secondaries = []
4752

    
4753
    # mirror node verification
4754
    if self.op.disk_template in constants.DTS_NET_MIRROR:
4755
      if self.op.snode is None:
4756
        raise errors.OpPrereqError("The networked disk templates need"
4757
                                   " a mirror node")
4758
      if self.op.snode == pnode.name:
4759
        raise errors.OpPrereqError("The secondary node cannot be"
4760
                                   " the primary node.")
4761
      _CheckNodeOnline(self, self.op.snode)
4762
      _CheckNodeNotDrained(self, self.op.snode)
4763
      self.secondaries.append(self.op.snode)
4764

    
4765
    nodenames = [pnode.name] + self.secondaries
4766

    
4767
    req_size = _ComputeDiskSize(self.op.disk_template,
4768
                                self.disks)
4769

    
4770
    # Check lv size requirements
4771
    if req_size is not None:
4772
      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4773
                                         self.op.hypervisor)
4774
      for node in nodenames:
4775
        info = nodeinfo[node]
4776
        info.Raise()
4777
        info = info.data
4778
        if not info:
4779
          raise errors.OpPrereqError("Cannot get current information"
4780
                                     " from node '%s'" % node)
4781
        vg_free = info.get('vg_free', None)
4782
        if not isinstance(vg_free, int):
4783
          raise errors.OpPrereqError("Can't compute free disk space on"
4784
                                     " node %s" % node)
4785
        if req_size > info['vg_free']:
4786
          raise errors.OpPrereqError("Not enough disk space on target node %s."
4787
                                     " %d MB available, %d MB required" %
4788
                                     (node, info['vg_free'], req_size))
4789

    
4790
    _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4791

    
4792
    # os verification
4793
    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4794
    result.Raise()
4795
    if not isinstance(result.data, objects.OS):
4796
      raise errors.OpPrereqError("OS '%s' not in supported os list for"
4797
                                 " primary node"  % self.op.os_type)
4798

    
4799
    _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4800

    
4801
    # memory check on primary node
4802
    if self.op.start:
4803
      _CheckNodeFreeMemory(self, self.pnode.name,
4804
                           "creating instance %s" % self.op.instance_name,
4805
                           self.be_full[constants.BE_MEMORY],
4806
                           self.op.hypervisor)
4807

    
4808
  def Exec(self, feedback_fn):
4809
    """Create and add the instance to the cluster.
4810

4811
    """
4812
    instance = self.op.instance_name
4813
    pnode_name = self.pnode.name
4814

    
4815
    ht_kind = self.op.hypervisor
4816
    if ht_kind in constants.HTS_REQ_PORT:
4817
      network_port = self.cfg.AllocatePort()
4818
    else:
4819
      network_port = None
4820

    
4821
    ##if self.op.vnc_bind_address is None:
4822
    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4823

    
4824
    # this is needed because os.path.join does not accept None arguments
4825
    if self.op.file_storage_dir is None:
4826
      string_file_storage_dir = ""
4827
    else:
4828
      string_file_storage_dir = self.op.file_storage_dir
4829

    
4830
    # build the full file storage dir path
4831
    file_storage_dir = os.path.normpath(os.path.join(
4832
                                        self.cfg.GetFileStorageDir(),
4833
                                        string_file_storage_dir, instance))
4834

    
4835

    
4836
    disks = _GenerateDiskTemplate(self,
4837
                                  self.op.disk_template,
4838
                                  instance, pnode_name,
4839
                                  self.secondaries,
4840
                                  self.disks,
4841
                                  file_storage_dir,
4842
                                  self.op.file_driver,
4843
                                  0)
4844

    
4845
    iobj = objects.Instance(name=instance, os=self.op.os_type,
4846
                            primary_node=pnode_name,
4847
                            nics=self.nics, disks=disks,
4848
                            disk_template=self.op.disk_template,
4849
                            admin_up=False,
4850
                            network_port=network_port,
4851
                            beparams=self.op.beparams,
4852
                            hvparams=self.op.hvparams,
4853
                            hypervisor=self.op.hypervisor,
4854
                            )
4855

    
4856
    feedback_fn("* creating instance disks...")
4857
    try:
4858
      _CreateDisks(self, iobj)
4859
    except errors.OpExecError:
4860
      self.LogWarning("Device creation failed, reverting...")
4861
      try:
4862
        _RemoveDisks(self, iobj)
4863
      finally:
4864
        self.cfg.ReleaseDRBDMinors(instance)
4865
        raise
4866

    
4867
    feedback_fn("adding instance %s to cluster config" % instance)
4868

    
4869
    self.cfg.AddInstance(iobj)
4870
    # Declare that we don't want to remove the instance lock anymore, as we've
4871
    # added the instance to the config
4872
    del self.remove_locks[locking.LEVEL_INSTANCE]
4873
    # Unlock all the nodes
4874
    if self.op.mode == constants.INSTANCE_IMPORT:
4875
      nodes_keep = [self.op.src_node]
4876
      nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4877
                       if node != self.op.src_node]
4878
      self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4879
      self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4880
    else:
4881
      self.context.glm.release(locking.LEVEL_NODE)
4882
      del self.acquired_locks[locking.LEVEL_NODE]
4883

    
4884
    if self.op.wait_for_sync:
4885
      disk_abort = not _WaitForSync(self, iobj)
4886
    elif iobj.disk_template in constants.DTS_NET_MIRROR:
4887
      # make sure the disks are not degraded (still sync-ing is ok)
4888
      time.sleep(15)
4889
      feedback_fn("* checking mirrors status")
4890
      disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4891
    else:
4892
      disk_abort = False
4893

    
4894
    if disk_abort:
4895
      _RemoveDisks(self, iobj)
4896
      self.cfg.RemoveInstance(iobj.name)
4897
      # Make sure the instance lock gets removed
4898
      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4899
      raise errors.OpExecError("There are some degraded disks for"
4900
                               " this instance")
4901

    
4902
    feedback_fn("creating os for instance %s on node %s" %
4903
                (instance, pnode_name))
4904

    
4905
    if iobj.disk_template != constants.DT_DISKLESS:
4906
      if self.op.mode == constants.INSTANCE_CREATE:
4907
        feedback_fn("* running the instance OS create scripts...")
4908
        result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4909
        msg = result.RemoteFailMsg()
4910
        if msg:
4911
          raise errors.OpExecError("Could not add os for instance %s"
4912
                                   " on node %s: %s" %
4913
                                   (instance, pnode_name, msg))
4914

    
4915
      elif self.op.mode == constants.INSTANCE_IMPORT:
4916
        feedback_fn("* running the instance OS import scripts...")
4917
        src_node = self.op.src_node
4918
        src_images = self.src_images
4919
        cluster_name = self.cfg.GetClusterName()
4920
        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4921
                                                         src_node, src_images,
4922
                                                         cluster_name)
4923
        import_result.Raise()
4924
        for idx, result in enumerate(import_result.data):
4925
          if not result:
4926
            self.LogWarning("Could not import the image %s for instance"
4927
                            " %s, disk %d, on node %s" %
4928
                            (src_images[idx], instance, idx, pnode_name))
4929
      else:
4930
        # also checked in the prereq part
4931
        raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4932
                                     % self.op.mode)
4933

    
4934
    if self.op.start:
4935
      iobj.admin_up = True
4936
      self.cfg.Update(iobj)
4937
      logging.info("Starting instance %s on node %s", instance, pnode_name)
4938
      feedback_fn("* starting instance...")
4939
      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4940
      msg = result.RemoteFailMsg()
4941
      if msg:
4942
        raise errors.OpExecError("Could not start instance: %s" % msg)
4943

    
4944

    
4945
class LUConnectConsole(NoHooksLU):
4946
  """Connect to an instance's console.
4947

4948
  This is somewhat special in that it returns the command line that
4949
  you need to run on the master node in order to connect to the
4950
  console.
4951

4952
  """
4953
  _OP_REQP = ["instance_name"]
4954
  REQ_BGL = False
4955

    
4956
  def ExpandNames(self):
4957
    self._ExpandAndLockInstance()
4958

    
4959
  def CheckPrereq(self):
4960
    """Check prerequisites.
4961

4962
    This checks that the instance is in the cluster.
4963

4964
    """
4965
    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4966
    assert self.instance is not None, \
4967
      "Cannot retrieve locked instance %s" % self.op.instance_name
4968
    _CheckNodeOnline(self, self.instance.primary_node)
4969

    
4970
  def Exec(self, feedback_fn):
4971
    """Connect to the console of an instance
4972

4973
    """
4974
    instance = self.instance
4975
    node = instance.primary_node
4976

    
4977
    node_insts = self.rpc.call_instance_list([node],
4978
                                             [instance.hypervisor])[node]
4979
    node_insts.Raise()
4980

    
4981
    if instance.name not in node_insts.data:
4982
      raise errors.OpExecError("Instance %s is not running." % instance.name)
4983

    
4984
    logging.debug("Connecting to console of %s on %s", instance.name, node)
4985

    
4986
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
4987
    cluster = self.cfg.GetClusterInfo()
4988
    # beparams and hvparams are passed separately, to avoid editing the
4989
    # instance and then saving the defaults in the instance itself.
4990
    hvparams = cluster.FillHV(instance)
4991
    beparams = cluster.FillBE(instance)
4992
    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4993

    
4994
    # build ssh cmdline
4995
    return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4996

    
4997

    
4998
class LUReplaceDisks(LogicalUnit):
4999
  """Replace the disks of an instance.
5000

5001
  """
5002
  HPATH = "mirrors-replace"
5003
  HTYPE = constants.HTYPE_INSTANCE
5004
  _OP_REQP = ["instance_name", "mode", "disks"]
5005
  REQ_BGL = False
5006

    
5007
  def CheckArguments(self):
5008
    if not hasattr(self.op, "remote_node"):
5009
      self.op.remote_node = None
5010
    if not hasattr(self.op, "iallocator"):
5011
      self.op.iallocator = None
5012

    
5013
    # check for valid parameter combination
5014
    cnt = [self.op.remote_node, self.op.iallocator].count(None)
5015
    if self.op.mode == constants.REPLACE_DISK_CHG:
5016
      if cnt == 2:
5017
        raise errors.OpPrereqError("When changing the secondary either an"
5018
                                   " iallocator script must be used or the"
5019
                                   " new node given")
5020
      elif cnt == 0:
5021
        raise errors.OpPrereqError("Give either the iallocator or the new"
5022
                                   " secondary, not both")
5023
    else: # not replacing the secondary
5024
      if cnt != 2:
5025
        raise errors.OpPrereqError("The iallocator and new node options can"
5026
                                   " be used only when changing the"
5027
                                   " secondary node")
5028

    
5029
  def ExpandNames(self):
5030
    self._ExpandAndLockInstance()
5031

    
5032
    if self.op.iallocator is not None:
5033
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5034
    elif self.op.remote_node is not None:
5035
      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5036
      if remote_node is None:
5037
        raise errors.OpPrereqError("Node '%s' not known" %
5038
                                   self.op.remote_node)
5039
      self.op.remote_node = remote_node
5040
      # Warning: do not remove the locking of the new secondary here
5041
      # unless DRBD8.AddChildren is changed to work in parallel;
5042
      # currently it doesn't since parallel invocations of
5043
      # FindUnusedMinor will conflict
5044
      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5045
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5046
    else:
5047
      self.needed_locks[locking.LEVEL_NODE] = []
5048
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5049

    
5050
  def DeclareLocks(self, level):
5051
    # If we're not already locking all nodes in the set we have to declare the
5052
    # instance's primary/secondary nodes.
5053
    if (level == locking.LEVEL_NODE and
5054
        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5055
      self._LockInstancesNodes()
5056

    
5057
  def _RunAllocator(self):
5058
    """Compute a new secondary node using an IAllocator.
5059

5060
    """
5061
    ial = IAllocator(self,
5062
                     mode=constants.IALLOCATOR_MODE_RELOC,
5063
                     name=self.op.instance_name,
5064
                     relocate_from=[self.sec_node])
5065

    
5066
    ial.Run(self.op.iallocator)
5067

    
5068
    if not ial.success:
5069
      raise errors.OpPrereqError("Can't compute nodes using"
5070
                                 " iallocator '%s': %s" % (self.op.iallocator,
5071
                                                           ial.info))
5072
    if len(ial.nodes) != ial.required_nodes:
5073
      raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5074
                                 " of nodes (%s), required %s" %
5075
                                 (len(ial.nodes), ial.required_nodes))
5076
    self.op.remote_node = ial.nodes[0]
5077
    self.LogInfo("Selected new secondary for the instance: %s",
5078
                 self.op.remote_node)
5079

    
5080
  def BuildHooksEnv(self):
5081
    """Build hooks env.
5082

5083
    This runs on the master, the primary and all the secondaries.
5084

5085
    """
5086
    env = {
5087
      "MODE": self.op.mode,
5088
      "NEW_SECONDARY": self.op.remote_node,
5089
      "OLD_SECONDARY": self.instance.secondary_nodes[0],
5090
      }
5091
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5092
    nl = [
5093
      self.cfg.GetMasterNode(),
5094
      self.instance.primary_node,
5095
      ]
5096
    if self.op.remote_node is not None:
5097
      nl.append(self.op.remote_node)
5098
    return env, nl, nl
5099

    
5100
  def CheckPrereq(self):
5101
    """Check prerequisites.
5102

5103
    This checks that the instance is in the cluster.
5104

5105
    """
5106
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5107
    assert instance is not None, \
5108
      "Cannot retrieve locked instance %s" % self.op.instance_name
5109
    self.instance = instance
5110

    
5111
    if instance.disk_template != constants.DT_DRBD8:
5112
      raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5113
                                 " instances")
5114

    
5115
    if len(instance.secondary_nodes) != 1:
5116
      raise errors.OpPrereqError("The instance has a strange layout,"
5117
                                 " expected one secondary but found %d" %
5118
                                 len(instance.secondary_nodes))
5119

    
5120
    self.sec_node = instance.secondary_nodes[0]
5121

    
5122
    if self.op.iallocator is not None:
5123
      self._RunAllocator()
5124

    
5125
    remote_node = self.op.remote_node
5126
    if remote_node is not None:
5127
      self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5128
      assert self.remote_node_info is not None, \
5129
        "Cannot retrieve locked node %s" % remote_node
5130
    else:
5131
      self.remote_node_info = None
5132
    if remote_node == instance.primary_node:
5133
      raise errors.OpPrereqError("The specified node is the primary node of"
5134
                                 " the instance.")
5135
    elif remote_node == self.sec_node:
5136
      raise errors.OpPrereqError("The specified node is already the"
5137
                                 " secondary node of the instance.")
5138

    
5139
    if self.op.mode == constants.REPLACE_DISK_PRI:
5140
      n1 = self.tgt_node = instance.primary_node
5141
      n2 = self.oth_node = self.sec_node
5142
    elif self.op.mode == constants.REPLACE_DISK_SEC:
5143
      n1 = self.tgt_node = self.sec_node
5144
      n2 = self.oth_node = instance.primary_node
5145
    elif self.op.mode == constants.REPLACE_DISK_CHG:
5146
      n1 = self.new_node = remote_node
5147
      n2 = self.oth_node = instance.primary_node
5148
      self.tgt_node = self.sec_node
5149
      _CheckNodeNotDrained(self, remote_node)
5150
    else:
5151
      raise errors.ProgrammerError("Unhandled disk replace mode")
5152

    
5153
    _CheckNodeOnline(self, n1)
5154
    _CheckNodeOnline(self, n2)
5155

    
5156
    if not self.op.disks:
5157
      self.op.disks = range(len(instance.disks))
5158

    
5159
    for disk_idx in self.op.disks:
5160
      instance.FindDisk(disk_idx)
5161

    
5162
  def _ExecD8DiskOnly(self, feedback_fn):
5163
    """Replace a disk on the primary or secondary for dbrd8.
5164

5165
    The algorithm for replace is quite complicated:
5166

5167
      1. for each disk to be replaced:
5168

5169
        1. create new LVs on the target node with unique names
5170
        1. detach old LVs from the drbd device
5171
        1. rename old LVs to name_replaced.<time_t>
5172
        1. rename new LVs to old LVs
5173
        1. attach the new LVs (with the old names now) to the drbd device
5174

5175
      1. wait for sync across all devices
5176

5177
      1. for each modified disk:
5178

5179
        1. remove old LVs (which have the name name_replaces.<time_t>)
5180

5181
    Failures are not very well handled.
5182

5183
    """
5184
    steps_total = 6
5185
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5186
    instance = self.instance
5187
    iv_names = {}
5188
    vgname = self.cfg.GetVGName()
5189
    # start of work
5190
    cfg = self.cfg
5191
    tgt_node = self.tgt_node
5192
    oth_node = self.oth_node
5193

    
5194
    # Step: check device activation
5195
    self.proc.LogStep(1, steps_total, "check device existence")
5196
    info("checking volume groups")
5197
    my_vg = cfg.GetVGName()
5198
    results = self.rpc.call_vg_list([oth_node, tgt_node])
5199
    if not results:
5200
      raise errors.OpExecError("Can't list volume groups on the nodes")
5201
    for node in oth_node, tgt_node:
5202
      res = results[node]
5203
      if res.failed or not res.data or my_vg not in res.data:
5204
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5205
                                 (my_vg, node))
5206
    for idx, dev in enumerate(instance.disks):
5207
      if idx not in self.op.disks:
5208
        continue
5209
      for node in tgt_node, oth_node:
5210
        info("checking disk/%d on %s" % (idx, node))
5211
        cfg.SetDiskID(dev, node)
5212
        result = self.rpc.call_blockdev_find(node, dev)
5213
        msg = result.RemoteFailMsg()
5214
        if not msg and not result.payload:
5215
          msg = "disk not found"
5216
        if msg:
5217
          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5218
                                   (idx, node, msg))
5219

    
5220
    # Step: check other node consistency
5221
    self.proc.LogStep(2, steps_total, "check peer consistency")
5222
    for idx, dev in enumerate(instance.disks):
5223
      if idx not in self.op.disks:
5224
        continue
5225
      info("checking disk/%d consistency on %s" % (idx, oth_node))
5226
      if not _CheckDiskConsistency(self, dev, oth_node,
5227
                                   oth_node==instance.primary_node):
5228
        raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5229
                                 " to replace disks on this node (%s)" %
5230
                                 (oth_node, tgt_node))
5231

    
5232
    # Step: create new storage
5233
    self.proc.LogStep(3, steps_total, "allocate new storage")
5234
    for idx, dev in enumerate(instance.disks):
5235
      if idx not in self.op.disks:
5236
        continue
5237
      size = dev.size
5238
      cfg.SetDiskID(dev, tgt_node)
5239
      lv_names = [".disk%d_%s" % (idx, suf)
5240
                  for suf in ["data", "meta"]]
5241
      names = _GenerateUniqueNames(self, lv_names)
5242
      lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5243
                             logical_id=(vgname, names[0]))
5244
      lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5245
                             logical_id=(vgname, names[1]))
5246
      new_lvs = [lv_data, lv_meta]
5247
      old_lvs = dev.children
5248
      iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5249
      info("creating new local storage on %s for %s" %
5250
           (tgt_node, dev.iv_name))
5251
      # we pass force_create=True to force the LVM creation
5252
      for new_lv in new_lvs:
5253
        _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5254
                        _GetInstanceInfoText(instance), False)
5255

    
5256
    # Step: for each lv, detach+rename*2+attach
5257
    self.proc.LogStep(4, steps_total, "change drbd configuration")
5258
    for dev, old_lvs, new_lvs in iv_names.itervalues():
5259
      info("detaching %s drbd from local storage" % dev.iv_name)
5260
      result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5261
      msg = result.RemoteFailMsg()
5262
      if msg:
5263
        raise errors.OpExecError("Can't detach drbd from local storage on node"
5264
                                 " %s for device %s: %s" %
5265
                                 (tgt_node, dev.iv_name, msg))
5266
      #dev.children = []
5267
      #cfg.Update(instance)
5268

    
5269
      # ok, we created the new LVs, so now we know we have the needed
5270
      # storage; as such, we proceed on the target node to rename
5271
      # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5272
      # using the assumption that logical_id == physical_id (which in
5273
      # turn is the unique_id on that node)
5274

    
5275
      # FIXME(iustin): use a better name for the replaced LVs
5276
      temp_suffix = int(time.time())
5277
      ren_fn = lambda d, suff: (d.physical_id[0],
5278
                                d.physical_id[1] + "_replaced-%s" % suff)
5279
      # build the rename list based on what LVs exist on the node
5280
      rlist = []
5281
      for to_ren in old_lvs:
5282
        result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5283
        if not result.RemoteFailMsg() and result.payload:
5284
          # device exists
5285
          rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5286

    
5287
      info("renaming the old LVs on the target node")
5288
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5289
      msg = result.RemoteFailMsg()
5290
      if msg:
5291
        raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5292
                                 (tgt_node, msg))
5293
      # now we rename the new LVs to the old LVs
5294
      info("renaming the new LVs on the target node")
5295
      rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5296
      result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5297
      msg = result.RemoteFailMsg()
5298
      if msg:
5299
        raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5300
                                 (tgt_node, msg))
5301

    
5302
      for old, new in zip(old_lvs, new_lvs):
5303
        new.logical_id = old.logical_id
5304
        cfg.SetDiskID(new, tgt_node)
5305

    
5306
      for disk in old_lvs:
5307
        disk.logical_id = ren_fn(disk, temp_suffix)
5308
        cfg.SetDiskID(disk, tgt_node)
5309

    
5310
      # now that the new lvs have the old name, we can add them to the device
5311
      info("adding new mirror component on %s" % tgt_node)
5312
      result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5313
      msg = result.RemoteFailMsg()
5314
      if msg:
5315
        for new_lv in new_lvs:
5316
          msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5317
          if msg:
5318
            warning("Can't rollback device %s: %s", dev, msg,
5319
                    hint="cleanup manually the unused logical volumes")
5320
        raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5321

    
5322
      dev.children = new_lvs
5323
      cfg.Update(instance)
5324

    
5325
    # Step: wait for sync
5326

    
5327
    # this can fail as the old devices are degraded and _WaitForSync
5328
    # does a combined result over all disks, so we don't check its
5329
    # return value
5330
    self.proc.LogStep(5, steps_total, "sync devices")
5331
    _WaitForSync(self, instance, unlock=True)
5332

    
5333
    # so check manually all the devices
5334
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5335
      cfg.SetDiskID(dev, instance.primary_node)
5336
      result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5337
      msg = result.RemoteFailMsg()
5338
      if not msg and not result.payload:
5339
        msg = "disk not found"
5340
      if msg:
5341
        raise errors.OpExecError("Can't find DRBD device %s: %s" %
5342
                                 (name, msg))
5343
      if result.payload[5]:
5344
        raise errors.OpExecError("DRBD device %s is degraded!" % name)
5345

    
5346
    # Step: remove old storage
5347
    self.proc.LogStep(6, steps_total, "removing old storage")
5348
    for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5349
      info("remove logical volumes for %s" % name)
5350
      for lv in old_lvs:
5351
        cfg.SetDiskID(lv, tgt_node)
5352
        msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5353
        if msg:
5354
          warning("Can't remove old LV: %s" % msg,
5355
                  hint="manually remove unused LVs")
5356
          continue
5357

    
5358
  def _ExecD8Secondary(self, feedback_fn):
5359
    """Replace the secondary node for drbd8.
5360

5361
    The algorithm for replace is quite complicated:
5362
      - for all disks of the instance:
5363
        - create new LVs on the new node with same names
5364
        - shutdown the drbd device on the old secondary
5365
        - disconnect the drbd network on the primary
5366
        - create the drbd device on the new secondary
5367
        - network attach the drbd on the primary, using an artifice:
5368
          the drbd code for Attach() will connect to the network if it
5369
          finds a device which is connected to the good local disks but
5370
          not network enabled
5371
      - wait for sync across all devices
5372
      - remove all disks from the old secondary
5373

5374
    Failures are not very well handled.
5375

5376
    """
5377
    steps_total = 6
5378
    warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5379
    instance = self.instance
5380
    iv_names = {}
5381
    # start of work
5382
    cfg = self.cfg
5383
    old_node = self.tgt_node
5384
    new_node = self.new_node
5385
    pri_node = instance.primary_node
5386
    nodes_ip = {
5387
      old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5388
      new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5389
      pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5390
      }
5391

    
5392
    # Step: check device activation
5393
    self.proc.LogStep(1, steps_total, "check device existence")
5394
    info("checking volume groups")
5395
    my_vg = cfg.GetVGName()
5396
    results = self.rpc.call_vg_list([pri_node, new_node])
5397
    for node in pri_node, new_node:
5398
      res = results[node]
5399
      if res.failed or not res.data or my_vg not in res.data:
5400
        raise errors.OpExecError("Volume group '%s' not found on %s" %
5401
                                 (my_vg, node))
5402
    for idx, dev in enumerate(instance.disks):
5403
      if idx not in self.op.disks:
5404
        continue
5405
      info("checking disk/%d on %s" % (idx, pri_node))
5406
      cfg.SetDiskID(dev, pri_node)
5407
      result = self.rpc.call_blockdev_find(pri_node, dev)
5408
      msg = result.RemoteFailMsg()
5409
      if not msg and not result.payload:
5410
        msg = "disk not found"
5411
      if msg:
5412
        raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5413
                                 (idx, pri_node, msg))
5414

    
5415
    # Step: check other node consistency
5416
    self.proc.LogStep(2, steps_total, "check peer consistency")
5417
    for idx, dev in enumerate(instance.disks):
5418
      if idx not in self.op.disks:
5419
        continue
5420
      info("checking disk/%d consistency on %s" % (idx, pri_node))
5421
      if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5422
        raise errors.OpExecError("Primary node (%s) has degraded storage,"
5423
                                 " unsafe to replace the secondary" %
5424
                                 pri_node)
5425

    
5426
    # Step: create new storage
5427
    self.proc.LogStep(3, steps_total, "allocate new storage")
5428
    for idx, dev in enumerate(instance.disks):
5429
      info("adding new local storage on %s for disk/%d" %
5430
           (new_node, idx))
5431
      # we pass force_create=True to force LVM creation
5432
      for new_lv in dev.children:
5433
        _CreateBlockDev(self, new_node, instance, new_lv, True,
5434
                        _GetInstanceInfoText(instance), False)
5435

    
5436
    # Step 4: dbrd minors and drbd setups changes
5437
    # after this, we must manually remove the drbd minors on both the
5438
    # error and the success paths
5439
    minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5440
                                   instance.name)
5441
    logging.debug("Allocated minors %s" % (minors,))
5442
    self.proc.LogStep(4, steps_total, "changing drbd configuration")
5443
    for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5444
      size = dev.size
5445
      info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5446
      # create new devices on new_node; note that we create two IDs:
5447
      # one without port, so the drbd will be activated without
5448
      # networking information on the new node at this stage, and one
5449
      # with network, for the latter activation in step 4
5450
      (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5451
      if pri_node == o_node1:
5452
        p_minor = o_minor1
5453
      else:
5454
        p_minor = o_minor2
5455

    
5456
      new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5457
      new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5458

    
5459
      iv_names[idx] = (dev, dev.children, new_net_id)
5460
      logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5461
                    new_net_id)
5462
      new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5463
                              logical_id=new_alone_id,
5464
                              children=dev.children)
5465
      try:
5466
        _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5467
                              _GetInstanceInfoText(instance), False)
5468
      except errors.GenericError:
5469
        self.cfg.ReleaseDRBDMinors(instance.name)
5470
        raise
5471

    
5472
    for idx, dev in enumerate(instance.disks):
5473
      # we have new devices, shutdown the drbd on the old secondary
5474
      info("shutting down drbd for disk/%d on old node" % idx)
5475
      cfg.SetDiskID(dev, old_node)
5476
      msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5477
      if msg:
5478
        warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5479
                (idx, msg),
5480
                hint="Please cleanup this device manually as soon as possible")
5481

    
5482
    info("detaching primary drbds from the network (=> standalone)")
5483
    result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5484
                                               instance.disks)[pri_node]
5485

    
5486
    msg = result.RemoteFailMsg()
5487
    if msg:
5488
      # detaches didn't succeed (unlikely)
5489
      self.cfg.ReleaseDRBDMinors(instance.name)
5490
      raise errors.OpExecError("Can't detach the disks from the network on"
5491
                               " old node: %s" % (msg,))
5492

    
5493
    # if we managed to detach at least one, we update all the disks of
5494
    # the instance to point to the new secondary
5495
    info("updating instance configuration")
5496
    for dev, _, new_logical_id in iv_names.itervalues():
5497
      dev.logical_id = new_logical_id
5498
      cfg.SetDiskID(dev, pri_node)
5499
    cfg.Update(instance)
5500

    
5501
    # and now perform the drbd attach
5502
    info("attaching primary drbds to new secondary (standalone => connected)")
5503
    result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5504
                                           instance.disks, instance.name,
5505
                                           False)
5506
    for to_node, to_result in result.items():
5507
      msg = to_result.RemoteFailMsg()
5508
      if msg:
5509
        warning("can't attach drbd disks on node %s: %s", to_node, msg,
5510
                hint="please do a gnt-instance info to see the"
5511
                " status of disks")
5512

    
5513
    # this can fail as the old devices are degraded and _WaitForSync
5514
    # does a combined result over all disks, so we don't check its
5515
    # return value
5516
    self.proc.LogStep(5, steps_total, "sync devices")
5517
    _WaitForSync(self, instance, unlock=True)
5518

    
5519
    # so check manually all the devices
5520
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5521
      cfg.SetDiskID(dev, pri_node)
5522
      result = self.rpc.call_blockdev_find(pri_node, dev)
5523
      msg = result.RemoteFailMsg()
5524
      if not msg and not result.payload:
5525
        msg = "disk not found"
5526
      if msg:
5527
        raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5528
                                 (idx, msg))
5529
      if result.payload[5]:
5530
        raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5531

    
5532
    self.proc.LogStep(6, steps_total, "removing old storage")
5533
    for idx, (dev, old_lvs, _) in iv_names.iteritems():
5534
      info("remove logical volumes for disk/%d" % idx)
5535
      for lv in old_lvs:
5536
        cfg.SetDiskID(lv, old_node)
5537
        msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5538
        if msg:
5539
          warning("Can't remove LV on old secondary: %s", msg,
5540
                  hint="Cleanup stale volumes by hand")
5541

    
5542
  def Exec(self, feedback_fn):
5543
    """Execute disk replacement.
5544

5545
    This dispatches the disk replacement to the appropriate handler.
5546

5547
    """
5548
    instance = self.instance
5549

    
5550
    # Activate the instance disks if we're replacing them on a down instance
5551
    if not instance.admin_up:
5552
      _StartInstanceDisks(self, instance, True)
5553

    
5554
    if self.op.mode == constants.REPLACE_DISK_CHG:
5555
      fn = self._ExecD8Secondary
5556
    else:
5557
      fn = self._ExecD8DiskOnly
5558

    
5559
    ret = fn(feedback_fn)
5560

    
5561
    # Deactivate the instance disks if we're replacing them on a down instance
5562
    if not instance.admin_up:
5563
      _SafeShutdownInstanceDisks(self, instance)
5564

    
5565
    return ret
5566

    
5567

    
5568
class LUGrowDisk(LogicalUnit):
5569
  """Grow a disk of an instance.
5570

5571
  """
5572
  HPATH = "disk-grow"
5573
  HTYPE = constants.HTYPE_INSTANCE
5574
  _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5575
  REQ_BGL = False
5576

    
5577
  def ExpandNames(self):
5578
    self._ExpandAndLockInstance()
5579
    self.needed_locks[locking.LEVEL_NODE] = []
5580
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5581

    
5582
  def DeclareLocks(self, level):
5583
    if level == locking.LEVEL_NODE:
5584
      self._LockInstancesNodes()
5585

    
5586
  def BuildHooksEnv(self):
5587
    """Build hooks env.
5588

5589
    This runs on the master, the primary and all the secondaries.
5590

5591
    """
5592
    env = {
5593
      "DISK": self.op.disk,
5594
      "AMOUNT": self.op.amount,
5595
      }
5596
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5597
    nl = [
5598
      self.cfg.GetMasterNode(),
5599
      self.instance.primary_node,
5600
      ]
5601
    return env, nl, nl
5602

    
5603
  def CheckPrereq(self):
5604
    """Check prerequisites.
5605

5606
    This checks that the instance is in the cluster.
5607

5608
    """
5609
    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5610
    assert instance is not None, \
5611
      "Cannot retrieve locked instance %s" % self.op.instance_name
5612
    nodenames = list(instance.all_nodes)
5613
    for node in nodenames:
5614
      _CheckNodeOnline(self, node)
5615

    
5616

    
5617
    self.instance = instance
5618

    
5619
    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5620
      raise errors.OpPrereqError("Instance's disk layout does not support"
5621
                                 " growing.")
5622

    
5623
    self.disk = instance.FindDisk(self.op.disk)
5624

    
5625
    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5626
                                       instance.hypervisor)
5627
    for node in nodenames:
5628
      info = nodeinfo[node]
5629
      if info.failed or not info.data:
5630
        raise errors.OpPrereqError("Cannot get current information"
5631
                                   " from node '%s'" % node)
5632
      vg_free = info.data.get('vg_free', None)
5633
      if not isinstance(vg_free, int):
5634
        raise errors.OpPrereqError("Can't compute free disk space on"
5635
                                   " node %s" % node)
5636
      if self.op.amount > vg_free:
5637
        raise errors.OpPrereqError("Not enough disk space on target node %s:"
5638
                                   " %d MiB available, %d MiB required" %
5639
                                   (node, vg_free, self.op.amount))
5640

    
5641
  def Exec(self, feedback_fn):
5642
    """Execute disk grow.
5643

5644
    """
5645
    instance = self.instance
5646
    disk = self.disk
5647
    for node in instance.all_nodes:
5648
      self.cfg.SetDiskID(disk, node)
5649
      result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5650
      msg = result.RemoteFailMsg()
5651
      if msg:
5652
        raise errors.OpExecError("Grow request failed to node %s: %s" %
5653
                                 (node, msg))
5654
    disk.RecordGrow(self.op.amount)
5655
    self.cfg.Update(instance)
5656
    if self.op.wait_for_sync:
5657
      disk_abort = not _WaitForSync(self, instance)
5658
      if disk_abort:
5659
        self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5660
                             " status.\nPlease check the instance.")
5661

    
5662

    
5663
class LUQueryInstanceData(NoHooksLU):
5664
  """Query runtime instance data.
5665

5666
  """
5667
  _OP_REQP = ["instances", "static"]
5668
  REQ_BGL = False
5669

    
5670
  def ExpandNames(self):
5671
    self.needed_locks = {}
5672
    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5673

    
5674
    if not isinstance(self.op.instances, list):
5675
      raise errors.OpPrereqError("Invalid argument type 'instances'")
5676

    
5677
    if self.op.instances:
5678
      self.wanted_names = []
5679
      for name in self.op.instances:
5680
        full_name = self.cfg.ExpandInstanceName(name)
5681
        if full_name is None:
5682
          raise errors.OpPrereqError("Instance '%s' not known" % name)
5683
        self.wanted_names.append(full_name)
5684
      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5685
    else:
5686
      self.wanted_names = None
5687
      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5688

    
5689
    self.needed_locks[locking.LEVEL_NODE] = []
5690
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5691

    
5692
  def DeclareLocks(self, level):
5693
    if level == locking.LEVEL_NODE:
5694
      self._LockInstancesNodes()
5695

    
5696
  def CheckPrereq(self):
5697
    """Check prerequisites.
5698

5699
    This only checks the optional instance list against the existing names.
5700

5701
    """
5702
    if self.wanted_names is None:
5703
      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5704

    
5705
    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5706
                             in self.wanted_names]
5707
    return
5708

    
5709
  def _ComputeDiskStatus(self, instance, snode, dev):
5710
    """Compute block device status.
5711

5712
    """
5713
    static = self.op.static
5714
    if not static:
5715
      self.cfg.SetDiskID(dev, instance.primary_node)
5716
      dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5717
      if dev_pstatus.offline:
5718
        dev_pstatus = None
5719
      else:
5720
        msg = dev_pstatus.RemoteFailMsg()
5721
        if msg:
5722
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5723
                                   (instance.name, msg))
5724
        dev_pstatus = dev_pstatus.payload
5725
    else:
5726
      dev_pstatus = None
5727

    
5728
    if dev.dev_type in constants.LDS_DRBD:
5729
      # we change the snode then (otherwise we use the one passed in)
5730
      if dev.logical_id[0] == instance.primary_node:
5731
        snode = dev.logical_id[1]
5732
      else:
5733
        snode = dev.logical_id[0]
5734

    
5735
    if snode and not static:
5736
      self.cfg.SetDiskID(dev, snode)
5737
      dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5738
      if dev_sstatus.offline:
5739
        dev_sstatus = None
5740
      else:
5741
        msg = dev_sstatus.RemoteFailMsg()
5742
        if msg:
5743
          raise errors.OpExecError("Can't compute disk status for %s: %s" %
5744
                                   (instance.name, msg))
5745
        dev_sstatus = dev_sstatus.payload
5746
    else:
5747
      dev_sstatus = None
5748

    
5749
    if dev.children:
5750
      dev_children = [self._ComputeDiskStatus(instance, snode, child)
5751
                      for child in dev.children]
5752
    else:
5753
      dev_children = []
5754

    
5755
    data = {
5756
      "iv_name": dev.iv_name,
5757
      "dev_type": dev.dev_type,
5758
      "logical_id": dev.logical_id,
5759
      "physical_id": dev.physical_id,
5760
      "pstatus": dev_pstatus,
5761
      "sstatus": dev_sstatus,
5762
      "children": dev_children,
5763
      "mode": dev.mode,
5764
      }
5765

    
5766
    return data
5767

    
5768
  def Exec(self, feedback_fn):
5769
    """Gather and return data"""
5770
    result = {}
5771

    
5772
    cluster = self.cfg.GetClusterInfo()
5773

    
5774
    for instance in self.wanted_instances:
5775
      if not self.op.static:
5776
        remote_info = self.rpc.call_instance_info(instance.primary_node,
5777
                                                  instance.name,
5778
                                                  instance.hypervisor)
5779
        remote_info.Raise()
5780
        remote_info = remote_info.data
5781
        if remote_info and "state" in remote_info:
5782
          remote_state = "up"
5783
        else:
5784
          remote_state = "down"
5785
      else:
5786
        remote_state = None
5787
      if instance.admin_up:
5788
        config_state = "up"
5789
      else:
5790
        config_state = "down"
5791

    
5792
      disks = [self._ComputeDiskStatus(instance, None, device)
5793
               for device in instance.disks]
5794

    
5795
      idict = {
5796
        "name": instance.name,
5797
        "config_state": config_state,
5798
        "run_state": remote_state,
5799
        "pnode": instance.primary_node,
5800
        "snodes": instance.secondary_nodes,
5801
        "os": instance.os,
5802
        "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5803
        "disks": disks,
5804
        "hypervisor": instance.hypervisor,
5805
        "network_port": instance.network_port,
5806
        "hv_instance": instance.hvparams,
5807
        "hv_actual": cluster.FillHV(instance),
5808
        "be_instance": instance.beparams,
5809
        "be_actual": cluster.FillBE(instance),
5810
        }
5811

    
5812
      result[instance.name] = idict
5813

    
5814
    return result
5815

    
5816

    
5817
class LUSetInstanceParams(LogicalUnit):
5818
  """Modifies an instances's parameters.
5819

5820
  """
5821
  HPATH = "instance-modify"
5822
  HTYPE = constants.HTYPE_INSTANCE
5823
  _OP_REQP = ["instance_name"]
5824
  REQ_BGL = False
5825

    
5826
  def CheckArguments(self):
5827
    if not hasattr(self.op, 'nics'):
5828
      self.op.nics = []
5829
    if not hasattr(self.op, 'disks'):
5830
      self.op.disks = []
5831
    if not hasattr(self.op, 'beparams'):
5832
      self.op.beparams = {}
5833
    if not hasattr(self.op, 'hvparams'):
5834
      self.op.hvparams = {}
5835
    self.op.force = getattr(self.op, "force", False)
5836
    if not (self.op.nics or self.op.disks or
5837
            self.op.hvparams or self.op.beparams):
5838
      raise errors.OpPrereqError("No changes submitted")
5839

    
5840
    # Disk validation
5841
    disk_addremove = 0
5842
    for disk_op, disk_dict in self.op.disks:
5843
      if disk_op == constants.DDM_REMOVE:
5844
        disk_addremove += 1
5845
        continue
5846
      elif disk_op == constants.DDM_ADD:
5847
        disk_addremove += 1
5848
      else:
5849
        if not isinstance(disk_op, int):
5850
          raise errors.OpPrereqError("Invalid disk index")
5851
      if disk_op == constants.DDM_ADD:
5852
        mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5853
        if mode not in constants.DISK_ACCESS_SET:
5854
          raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5855
        size = disk_dict.get('size', None)
5856
        if size is None:
5857
          raise errors.OpPrereqError("Required disk parameter size missing")
5858
        try:
5859
          size = int(size)
5860
        except ValueError, err:
5861
          raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5862
                                     str(err))
5863
        disk_dict['size'] = size
5864
      else:
5865
        # modification of disk
5866
        if 'size' in disk_dict:
5867
          raise errors.OpPrereqError("Disk size change not possible, use"
5868
                                     " grow-disk")
5869

    
5870
    if disk_addremove > 1:
5871
      raise errors.OpPrereqError("Only one disk add or remove operation"
5872
                                 " supported at a time")
5873

    
5874
    # NIC validation
5875
    nic_addremove = 0
5876
    for nic_op, nic_dict in self.op.nics:
5877
      if nic_op == constants.DDM_REMOVE:
5878
        nic_addremove += 1
5879
        continue
5880
      elif nic_op == constants.DDM_ADD:
5881
        nic_addremove += 1
5882
      else:
5883
        if not isinstance(nic_op, int):
5884
          raise errors.OpPrereqError("Invalid nic index")
5885

    
5886
      # nic_dict should be a dict
5887
      nic_ip = nic_dict.get('ip', None)
5888
      if nic_ip is not None:
5889
        if nic_ip.lower() == constants.VALUE_NONE:
5890
          nic_dict['ip'] = None
5891
        else:
5892
          if not utils.IsValidIP(nic_ip):
5893
            raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5894

    
5895
      nic_bridge = nic_dict.get('bridge', None)
5896
      nic_link = nic_dict.get('link', None)
5897
      if nic_bridge and nic_link:
5898
        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5899
      elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5900
        nic_dict['bridge'] = None
5901
      elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5902
        nic_dict['link'] = None
5903

    
5904
      if nic_op == constants.DDM_ADD:
5905
        nic_mac = nic_dict.get('mac', None)
5906
        if nic_mac is None:
5907
          nic_dict['mac'] = constants.VALUE_AUTO
5908

    
5909
      if 'mac' in nic_dict:
5910
        nic_mac = nic_dict['mac']
5911
        if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5912
          if not utils.IsValidMac(nic_mac):
5913
            raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5914
        if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5915
          raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5916
                                     " modifying an existing nic")
5917

    
5918
    if nic_addremove > 1:
5919
      raise errors.OpPrereqError("Only one NIC add or remove operation"
5920
                                 " supported at a time")
5921

    
5922
  def ExpandNames(self):
5923
    self._ExpandAndLockInstance()
5924
    self.needed_locks[locking.LEVEL_NODE] = []
5925
    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5926

    
5927
  def DeclareLocks(self, level):
5928
    if level == locking.LEVEL_NODE:
5929
      self._LockInstancesNodes()
5930

    
5931
  def BuildHooksEnv(self):
5932
    """Build hooks env.
5933

5934
    This runs on the master, primary and secondaries.
5935

5936
    """
5937
    args = dict()
5938
    if constants.BE_MEMORY in self.be_new:
5939
      args['memory'] = self.be_new[constants.BE_MEMORY]
5940
    if constants.BE_VCPUS in self.be_new:
5941
      args['vcpus'] = self.be_new[constants.BE_VCPUS]
5942
    # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5943
    # information at all.
5944
    if self.op.nics:
5945
      args['nics'] = []
5946
      nic_override = dict(self.op.nics)
5947
      for idx, nic in enumerate(self.instance.nics):
5948
        if idx in nic_override:
5949
          this_nic_override = nic_override[idx]
5950
        else:
5951
          this_nic_override = {}
5952
        if 'ip' in this_nic_override:
5953
          ip = this_nic_override['ip']
5954
        else:
5955
          ip = nic.ip
5956
        if 'bridge' in this_nic_override:
5957
          bridge = this_nic_override['bridge']
5958
        else:
5959
          bridge = nic.bridge
5960
        if 'mac' in this_nic_override:
5961
          mac = this_nic_override['mac']
5962
        else:
5963
          mac = nic.mac
5964
        args['nics'].append((ip, bridge, mac))
5965
      if constants.DDM_ADD in nic_override:
5966
        ip = nic_override[constants.DDM_ADD].get('ip', None)
5967
        bridge = nic_override[constants.DDM_ADD]['bridge']
5968
        mac = nic_override[constants.DDM_ADD]['mac']
5969
        args['nics'].append((ip, bridge, mac))
5970
      elif constants.DDM_REMOVE in nic_override:
5971
        del args['nics'][-1]
5972

    
5973
    env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5974
    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5975
    return env, nl, nl
5976

    
5977
  def _GetUpdatedParams(self, old_params, update_dict,
5978
                        default_values, parameter_types):
5979
    """Return the new params dict for the given params.
5980

5981
    @type old_params: dict
5982
    @type old_params: old parameters
5983
    @type update_dict: dict
5984
    @type update_dict: dict containing new parameter values,
5985
                       or constants.VALUE_DEFAULT to reset the
5986
                       parameter to its default value
5987
    @type default_values: dict
5988
    @param default_values: default values for the filled parameters
5989
    @type parameter_types: dict
5990
    @param parameter_types: dict mapping target dict keys to types
5991
                            in constants.ENFORCEABLE_TYPES
5992
    @rtype: (dict, dict)
5993
    @return: (new_parameters, filled_parameters)
5994

5995
    """
5996
    params_copy = copy.deepcopy(old_params)
5997
    for key, val in update_dict.iteritems():
5998
      if val == constants.VALUE_DEFAULT:
5999
        try:
6000
          del params_copy[key]
6001
        except KeyError:
6002
          pass
6003
      else:
6004
        params_copy[key] = val
6005
    utils.ForceDictType(params_copy, parameter_types)
6006
    params_filled = objects.FillDict(default_values, params_copy)
6007
    return (params_copy, params_filled)
6008

    
6009
  def CheckPrereq(self):
6010
    """Check prerequisites.
6011

6012
    This only checks the instance list against the existing names.
6013

6014
    """
6015
    force = self.force = self.op.force
6016

    
6017
    # checking the new params on the primary/secondary nodes
6018

    
6019
    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6020
    cluster = self.cluster = self.cfg.GetClusterInfo()
6021
    assert self.instance is not None, \
6022
      "Cannot retrieve locked instance %s" % self.op.instance_name
6023
    pnode = instance.primary_node
6024
    nodelist = list(instance.all_nodes)
6025

    
6026
    # hvparams processing
6027
    if self.op.hvparams:
6028
      i_hvdict, hv_new = self._GetUpdatedParams(
6029
                             instance.hvparams, self.op.hvparams,
6030
                             cluster.hvparams[instance.hypervisor],
6031
                             constants.HVS_PARAMETER_TYPES)
6032
      # local check
6033
      hypervisor.GetHypervisor(
6034
        instance.hypervisor).CheckParameterSyntax(hv_new)
6035
      _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6036
      self.hv_new = hv_new # the new actual values
6037
      self.hv_inst = i_hvdict # the new dict (without defaults)
6038
    else:
6039
      self.hv_new = self.hv_inst = {}
6040

    
6041
    # beparams processing
6042
    if self.op.beparams:
6043
      i_bedict, be_new = self._GetUpdatedParams(
6044
                             instance.beparams, self.op.beparams,
6045
                             cluster.beparams[constants.PP_DEFAULT],
6046
                             constants.BES_PARAMETER_TYPES)
6047
      self.be_new = be_new # the new actual values
6048
      self.be_inst = i_bedict # the new dict (without defaults)
6049
    else:
6050
      self.be_new = self.be_inst = {}
6051

    
6052
    self.warn = []
6053

    
6054
    if constants.BE_MEMORY in self.op.beparams and not self.force:
6055
      mem_check_list = [pnode]
6056
      if be_new[constants.BE_AUTO_BALANCE]:
6057
        # either we changed auto_balance to yes or it was from before
6058
        mem_check_list.extend(instance.secondary_nodes)
6059
      instance_info = self.rpc.call_instance_info(pnode, instance.name,
6060
                                                  instance.hypervisor)
6061
      nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6062
                                         instance.hypervisor)
6063
      if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6064
        # Assume the primary node is unreachable and go ahead
6065
        self.warn.append("Can't get info from primary node %s" % pnode)
6066
      else:
6067
        if not instance_info.failed and instance_info.data:
6068
          current_mem = int(instance_info.data['memory'])
6069
        else:
6070
          # Assume instance not running
6071
          # (there is a slight race condition here, but it's not very probable,
6072
          # and we have no other way to check)
6073
          current_mem = 0
6074
        miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6075
                    nodeinfo[pnode].data['memory_free'])
6076
        if miss_mem > 0:
6077
          raise errors.OpPrereqError("This change will prevent the instance"
6078
                                     " from starting, due to %d MB of memory"
6079
                                     " missing on its primary node" % miss_mem)
6080

    
6081
      if be_new[constants.BE_AUTO_BALANCE]:
6082
        for node, nres in nodeinfo.iteritems():
6083
          if node not in instance.secondary_nodes:
6084
            continue
6085
          if nres.failed or not isinstance(nres.data, dict):
6086
            self.warn.append("Can't get info from secondary node %s" % node)
6087
          elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6088
            self.warn.append("Not enough memory to failover instance to"
6089
                             " secondary node %s" % node)
6090

    
6091
    # NIC processing
6092
    self.nic_pnew = {}
6093
    self.nic_pinst = {}
6094
    for nic_op, nic_dict in self.op.nics:
6095
      if nic_op == constants.DDM_REMOVE:
6096
        if not instance.nics:
6097
          raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6098
        continue
6099
      if nic_op != constants.DDM_ADD:
6100
        # an existing nic
6101
        if nic_op < 0 or nic_op >= len(instance.nics):
6102
          raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6103
                                     " are 0 to %d" %
6104
                                     (nic_op, len(instance.nics)))
6105
        old_nic_params = instance.nics[nic_op].nicparams
6106
        old_nic_ip = instance.nics[nic_op].ip
6107
      else:
6108
        old_nic_params = {}
6109
        old_nic_ip = None
6110

    
6111
      update_params_dict = dict([(key, nic_dict[key])
6112
                                 for key in constants.NICS_PARAMETERS
6113
                                 if key in nic_dict])
6114

    
6115
      if 'bridge' in nic_dict:
6116
        update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6117

    
6118
      new_nic_params, new_filled_nic_params = \
6119
          self._GetUpdatedParams(old_nic_params, update_params_dict,
6120
                                 cluster.nicparams[constants.PP_DEFAULT],
6121
                                 constants.NICS_PARAMETER_TYPES)
6122
      objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6123
      self.nic_pinst[nic_op] = new_nic_params
6124
      self.nic_pnew[nic_op] = new_filled_nic_params
6125
      new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6126

    
6127
      if new_nic_mode == constants.NIC_MODE_BRIDGED:
6128
        nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6129
        result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6130
        result.Raise()
6131
        if not result.data:
6132
          msg = ("Bridge '%s' doesn't exist on one of"
6133
                 " the instance nodes" % nic_bridge)
6134
          if self.force:
6135
            self.warn.append(msg)
6136
          else:
6137
            raise errors.OpPrereqError(msg)
6138
      if new_nic_mode == constants.NIC_MODE_ROUTED:
6139
        if 'ip' in nic_dict:
6140
          nic_ip = nic_dict['ip']
6141
        else:
6142
          nic_ip = old_nic_ip
6143
        if nic_ip is None:
6144
          raise errors.OpPrereqError('Cannot set the nic ip to None'
6145
                                     ' on a routed nic')
6146
      if 'mac' in nic_dict:
6147
        nic_mac = nic_dict['mac']
6148
        if nic_mac is None:
6149
          raise errors.OpPrereqError('Cannot set the nic mac to None')
6150
        elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6151
          # otherwise generate the mac
6152
          nic_dict['mac'] = self.cfg.GenerateMAC()
6153
        else:
6154
          # or validate/reserve the current one
6155
          if self.cfg.IsMacInUse(nic_mac):
6156
            raise errors.OpPrereqError("MAC address %s already in use"
6157
                                       " in cluster" % nic_mac)
6158

    
6159
    # DISK processing
6160
    if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6161
      raise errors.OpPrereqError("Disk operations not supported for"
6162
                                 " diskless instances")
6163
    for disk_op, disk_dict in self.op.disks:
6164
      if disk_op == constants.DDM_REMOVE:
6165
        if len(instance.disks) == 1:
6166
          raise errors.OpPrereqError("Cannot remove the last disk of"
6167
                                     " an instance")
6168
        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6169
        ins_l = ins_l[pnode]
6170
        if ins_l.failed or not isinstance(ins_l.data, list):
6171
          raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6172
        if instance.name in ins_l.data:
6173
          raise errors.OpPrereqError("Instance is running, can't remove"
6174
                                     " disks.")
6175

    
6176
      if (disk_op == constants.DDM_ADD and
6177
          len(instance.nics) >= constants.MAX_DISKS):
6178
        raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6179
                                   " add more" % constants.MAX_DISKS)
6180
      if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6181
        # an existing disk
6182
        if disk_op < 0 or disk_op >= len(instance.disks):
6183
          raise errors.OpPrereqError("Invalid disk index %s, valid values"
6184
                                     " are 0 to %d" %
6185
                                     (disk_op, len(instance.disks)))
6186

    
6187
    return
6188

    
6189
  def Exec(self, feedback_fn):
6190
    """Modifies an instance.
6191

6192
    All parameters take effect only at the next restart of the instance.
6193

6194
    """
6195
    # Process here the warnings from CheckPrereq, as we don't have a
6196
    # feedback_fn there.
6197
    for warn in self.warn:
6198
      feedback_fn("WARNING: %s" % warn)
6199

    
6200
    result = []
6201
    instance = self.instance
6202
    cluster = self.cluster
6203
    # disk changes
6204
    for disk_op, disk_dict in self.op.disks:
6205
      if disk_op == constants.DDM_REMOVE:
6206
        # remove the last disk
6207
        device = instance.disks.pop()
6208
        device_idx = len(instance.disks)
6209
        for node, disk in device.ComputeNodeTree(instance.primary_node):
6210
          self.cfg.SetDiskID(disk, node)
6211
          msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6212
          if msg:
6213
            self.LogWarning("Could not remove disk/%d on node %s: %s,"
6214
                            " continuing anyway", device_idx, node, msg)
6215
        result.append(("disk/%d" % device_idx, "remove"))
6216
      elif disk_op == constants.DDM_ADD:
6217
        # add a new disk
6218
        if instance.disk_template == constants.DT_FILE:
6219
          file_driver, file_path = instance.disks[0].logical_id
6220
          file_path = os.path.dirname(file_path)
6221
        else:
6222
          file_driver = file_path = None
6223
        disk_idx_base = len(instance.disks)
6224
        new_disk = _GenerateDiskTemplate(self,
6225
                                         instance.disk_template,
6226
                                         instance.name, instance.primary_node,
6227
                                         instance.secondary_nodes,
6228
                                         [disk_dict],
6229
                                         file_path,
6230
                                         file_driver,
6231
                                         disk_idx_base)[0]
6232
        instance.disks.append(new_disk)
6233
        info = _GetInstanceInfoText(instance)
6234

    
6235
        logging.info("Creating volume %s for instance %s",
6236
                     new_disk.iv_name, instance.name)
6237
        # Note: this needs to be kept in sync with _CreateDisks
6238
        #HARDCODE
6239
        for node in instance.all_nodes:
6240
          f_create = node == instance.primary_node
6241
          try:
6242
            _CreateBlockDev(self, node, instance, new_disk,
6243
                            f_create, info, f_create)
6244
          except errors.OpExecError, err:
6245
            self.LogWarning("Failed to create volume %s (%s) on"
6246
                            " node %s: %s",
6247
                            new_disk.iv_name, new_disk, node, err)
6248
        result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6249
                       (new_disk.size, new_disk.mode)))
6250
      else:
6251
        # change a given disk
6252
        instance.disks[disk_op].mode = disk_dict['mode']
6253
        result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6254
    # NIC changes
6255
    for nic_op, nic_dict in self.op.nics:
6256
      if nic_op == constants.DDM_REMOVE:
6257
        # remove the last nic
6258
        del instance.nics[-1]
6259
        result.append(("nic.%d" % len(instance.nics), "remove"))
6260
      elif nic_op == constants.DDM_ADD:
6261
        # mac and bridge should be set, by now
6262
        mac = nic_dict['mac']
6263
        ip = nic_dict.get('ip', None)
6264
        nicparams = self.nic_pinst[constants.DDM_ADD]
6265
        new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6266
        instance.nics.append(new_nic)
6267
        result.append(("nic.%d" % (len(instance.nics) - 1),
6268
                       "add:mac=%s,ip=%s,mode=%s,link=%s" %
6269
                       (new_nic.mac, new_nic.ip,
6270
                        self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6271
                        self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6272
                       )))
6273
      else:
6274
        for key in 'mac', 'ip':
6275
          if key in nic_dict:
6276
            setattr(instance.nics[nic_op], key, nic_dict[key])
6277
        if nic_op in self.nic_pnew:
6278
          instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6279
        for key, val in nic_dict.iteritems():
6280
          result.append(("nic.%s/%d" % (key, nic_op), val))
6281

    
6282
    # hvparams changes
6283
    if self.op.hvparams:
6284
      instance.hvparams = self.hv_inst
6285
      for key, val in self.op.hvparams.iteritems():
6286
        result.append(("hv/%s" % key, val))
6287

    
6288
    # beparams changes
6289
    if self.op.beparams:
6290
      instance.beparams = self.be_inst
6291
      for key, val in self.op.beparams.iteritems():
6292
        result.append(("be/%s" % key, val))
6293

    
6294
    self.cfg.Update(instance)
6295

    
6296
    return result
6297

    
6298

    
6299
class LUQueryExports(NoHooksLU):
6300
  """Query the exports list
6301

6302
  """
6303
  _OP_REQP = ['nodes']
6304
  REQ_BGL = False
6305

    
6306
  def ExpandNames(self):
6307
    self.needed_locks = {}
6308
    self.share_locks[locking.LEVEL_NODE] = 1
6309
    if not self.op.nodes:
6310
      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6311
    else:
6312
      self.needed_locks[locking.LEVEL_NODE] = \
6313
        _GetWantedNodes(self, self.op.nodes)
6314

    
6315
  def CheckPrereq(self):
6316
    """Check prerequisites.
6317

6318
    """
6319
    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6320

    
6321
  def Exec(self, feedback_fn):
6322
    """Compute the list of all the exported system images.
6323

6324
    @rtype: dict
6325
    @return: a dictionary with the structure node->(export-list)
6326
        where export-list is a list of the instances exported on
6327
        that node.
6328

6329
    """
6330
    rpcresult = self.rpc.call_export_list(self.nodes)
6331
    result = {}
6332
    for node in rpcresult:
6333
      if rpcresult[node].failed:
6334
        result[node] = False
6335
      else:
6336
        result[node] = rpcresult[node].data
6337

    
6338
    return result
6339

    
6340

    
6341
class LUExportInstance(LogicalUnit):
6342
  """Export an instance to an image in the cluster.
6343

6344
  """
6345
  HPATH = "instance-export"
6346
  HTYPE = constants.HTYPE_INSTANCE
6347
  _OP_REQP = ["instance_name", "target_node", "shutdown"]
6348
  REQ_BGL = False
6349

    
6350
  def ExpandNames(self):
6351
    self._ExpandAndLockInstance()
6352
    # FIXME: lock only instance primary and destination node
6353
    #
6354
    # Sad but true, for now we have do lock all nodes, as we don't know where
6355
    # the previous export might be, and and in this LU we search for it and
6356
    # remove it from its current node. In the future we could fix this by:
6357
    #  - making a tasklet to search (share-lock all), then create the new one,
6358
    #    then one to remove, after
6359
    #  - removing the removal operation altoghether
6360
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6361

    
6362
  def DeclareLocks(self, level):
6363
    """Last minute lock declaration."""
6364
    # All nodes are locked anyway, so nothing to do here.
6365

    
6366
  def BuildHooksEnv(self):
6367
    """Build hooks env.
6368

6369
    This will run on the master, primary node and target node.
6370

6371
    """
6372
    env = {
6373
      "EXPORT_NODE": self.op.target_node,
6374
      "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6375
      }
6376
    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6377
    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6378
          self.op.target_node]
6379
    return env, nl, nl
6380

    
6381
  def CheckPrereq(self):
6382
    """Check prerequisites.
6383

6384
    This checks that the instance and node names are valid.
6385

6386
    """
6387
    instance_name = self.op.instance_name
6388
    self.instance = self.cfg.GetInstanceInfo(instance_name)
6389
    assert self.instance is not None, \
6390
          "Cannot retrieve locked instance %s" % self.op.instance_name
6391
    _CheckNodeOnline(self, self.instance.primary_node)
6392

    
6393
    self.dst_node = self.cfg.GetNodeInfo(
6394
      self.cfg.ExpandNodeName(self.op.target_node))
6395

    
6396
    if self.dst_node is None:
6397
      # This is wrong node name, not a non-locked node
6398
      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6399
    _CheckNodeOnline(self, self.dst_node.name)
6400
    _CheckNodeNotDrained(self, self.dst_node.name)
6401

    
6402
    # instance disk type verification
6403
    for disk in self.instance.disks:
6404
      if disk.dev_type == constants.LD_FILE:
6405
        raise errors.OpPrereqError("Export not supported for instances with"
6406
                                   " file-based disks")
6407

    
6408
  def Exec(self, feedback_fn):
6409
    """Export an instance to an image in the cluster.
6410

6411
    """
6412
    instance = self.instance
6413
    dst_node = self.dst_node
6414
    src_node = instance.primary_node
6415
    if self.op.shutdown:
6416
      # shutdown the instance, but not the disks
6417
      result = self.rpc.call_instance_shutdown(src_node, instance)
6418
      msg = result.RemoteFailMsg()
6419
      if msg:
6420
        raise errors.OpExecError("Could not shutdown instance %s on"
6421
                                 " node %s: %s" %
6422
                                 (instance.name, src_node, msg))
6423

    
6424
    vgname = self.cfg.GetVGName()
6425

    
6426
    snap_disks = []
6427

    
6428
    # set the disks ID correctly since call_instance_start needs the
6429
    # correct drbd minor to create the symlinks
6430
    for disk in instance.disks:
6431
      self.cfg.SetDiskID(disk, src_node)
6432

    
6433
    try:
6434
      for disk in instance.disks:
6435
        # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6436
        new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6437
        if new_dev_name.failed or not new_dev_name.data:
6438
          self.LogWarning("Could not snapshot block device %s on node %s",
6439
                          disk.logical_id[1], src_node)
6440
          snap_disks.append(False)
6441
        else:
6442
          new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6443
                                 logical_id=(vgname, new_dev_name.data),
6444
                                 physical_id=(vgname, new_dev_name.data),
6445
                                 iv_name=disk.iv_name)
6446
          snap_disks.append(new_dev)
6447

    
6448
    finally:
6449
      if self.op.shutdown and instance.admin_up:
6450
        result = self.rpc.call_instance_start(src_node, instance, None, None)
6451
        msg = result.RemoteFailMsg()
6452
        if msg:
6453
          _ShutdownInstanceDisks(self, instance)
6454
          raise errors.OpExecError("Could not start instance: %s" % msg)
6455

    
6456
    # TODO: check for size
6457

    
6458
    cluster_name = self.cfg.GetClusterName()
6459
    for idx, dev in enumerate(snap_disks):
6460
      if dev:
6461
        result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6462
                                               instance, cluster_name, idx)
6463
        if result.failed or not result.data:
6464
          self.LogWarning("Could not export block device %s from node %s to"
6465
                          " node %s", dev.logical_id[1], src_node,
6466
                          dst_node.name)
6467
        msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6468
        if msg:
6469
          self.LogWarning("Could not remove snapshot block device %s from node"
6470
                          " %s: %s", dev.logical_id[1], src_node, msg)
6471

    
6472
    result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6473
    if result.failed or not result.data:
6474
      self.LogWarning("Could not finalize export for instance %s on node %s",
6475
                      instance.name, dst_node.name)
6476

    
6477
    nodelist = self.cfg.GetNodeList()
6478
    nodelist.remove(dst_node.name)
6479

    
6480
    # on one-node clusters nodelist will be empty after the removal
6481
    # if we proceed the backup would be removed because OpQueryExports
6482
    # substitutes an empty list with the full cluster node list.
6483
    if nodelist:
6484
      exportlist = self.rpc.call_export_list(nodelist)
6485
      for node in exportlist:
6486
        if exportlist[node].failed:
6487
          continue
6488
        if instance.name in exportlist[node].data:
6489
          if not self.rpc.call_export_remove(node, instance.name):
6490
            self.LogWarning("Could not remove older export for instance %s"
6491
                            " on node %s", instance.name, node)
6492

    
6493

    
6494
class LURemoveExport(NoHooksLU):
6495
  """Remove exports related to the named instance.
6496

6497
  """
6498
  _OP_REQP = ["instance_name"]
6499
  REQ_BGL = False
6500

    
6501
  def ExpandNames(self):
6502
    self.needed_locks = {}
6503
    # We need all nodes to be locked in order for RemoveExport to work, but we
6504
    # don't need to lock the instance itself, as nothing will happen to it (and
6505
    # we can remove exports also for a removed instance)
6506
    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6507

    
6508
  def CheckPrereq(self):
6509
    """Check prerequisites.
6510
    """
6511
    pass
6512

    
6513
  def Exec(self, feedback_fn):
6514
    """Remove any export.
6515

6516
    """
6517
    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6518
    # If the instance was not found we'll try with the name that was passed in.
6519
    # This will only work if it was an FQDN, though.
6520
    fqdn_warn = False
6521
    if not instance_name:
6522
      fqdn_warn = True
6523
      instance_name = self.op.instance_name
6524

    
6525
    exportlist = self.rpc.call_export_list(self.acquired_locks[
6526
      locking.LEVEL_NODE])
6527
    found = False
6528
    for node in exportlist:
6529
      if exportlist[node].failed:
6530
        self.LogWarning("Failed to query node %s, continuing" % node)
6531
        continue
6532
      if instance_name in exportlist[node].data:
6533
        found = True
6534
        result = self.rpc.call_export_remove(node, instance_name)
6535
        if result.failed or not result.data:
6536
          logging.error("Could not remove export for instance %s"
6537
                        " on node %s", instance_name, node)
6538

    
6539
    if fqdn_warn and not found:
6540
      feedback_fn("Export not found. If trying to remove an export belonging"
6541
                  " to a deleted instance please use its Fully Qualified"
6542
                  " Domain Name.")
6543

    
6544

    
6545
class TagsLU(NoHooksLU):
6546
  """Generic tags LU.
6547

6548
  This is an abstract class which is the parent of all the other tags LUs.
6549

6550
  """
6551

    
6552
  def ExpandNames(self):
6553
    self.needed_locks = {}
6554
    if self.op.kind == constants.TAG_NODE:
6555
      name = self.cfg.ExpandNodeName(self.op.name)
6556
      if name is None:
6557
        raise errors.OpPrereqError("Invalid node name (%s)" %
6558
                                   (self.op.name,))
6559
      self.op.name = name
6560
      self.needed_locks[locking.LEVEL_NODE] = name
6561
    elif self.op.kind == constants.TAG_INSTANCE:
6562
      name = self.cfg.ExpandInstanceName(self.op.name)
6563
      if name is None:
6564
        raise errors.OpPrereqError("Invalid instance name (%s)" %
6565
                                   (self.op.name,))
6566
      self.op.name = name
6567
      self.needed_locks[locking.LEVEL_INSTANCE] = name
6568

    
6569
  def CheckPrereq(self):
6570
    """Check prerequisites.
6571

6572
    """
6573
    if self.op.kind == constants.TAG_CLUSTER:
6574
      self.target = self.cfg.GetClusterInfo()
6575
    elif self.op.kind == constants.TAG_NODE:
6576
      self.target = self.cfg.GetNodeInfo(self.op.name)
6577
    elif self.op.kind == constants.TAG_INSTANCE:
6578
      self.target = self.cfg.GetInstanceInfo(self.op.name)
6579
    else:
6580
      raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6581
                                 str(self.op.kind))
6582

    
6583

    
6584
class LUGetTags(TagsLU):
6585
  """Returns the tags of a given object.
6586

6587
  """
6588
  _OP_REQP = ["kind", "name"]
6589
  REQ_BGL = False
6590

    
6591
  def Exec(self, feedback_fn):
6592
    """Returns the tag list.
6593

6594
    """
6595
    return list(self.target.GetTags())
6596

    
6597

    
6598
class LUSearchTags(NoHooksLU):
6599
  """Searches the tags for a given pattern.
6600

6601
  """
6602
  _OP_REQP = ["pattern"]
6603
  REQ_BGL = False
6604

    
6605
  def ExpandNames(self):
6606
    self.needed_locks = {}
6607

    
6608
  def CheckPrereq(self):
6609
    """Check prerequisites.
6610

6611
    This checks the pattern passed for validity by compiling it.
6612

6613
    """
6614
    try:
6615
      self.re = re.compile(self.op.pattern)
6616
    except re.error, err:
6617
      raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6618
                                 (self.op.pattern, err))
6619

    
6620
  def Exec(self, feedback_fn):
6621
    """Returns the tag list.
6622

6623
    """
6624
    cfg = self.cfg
6625
    tgts = [("/cluster", cfg.GetClusterInfo())]
6626
    ilist = cfg.GetAllInstancesInfo().values()
6627
    tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6628
    nlist = cfg.GetAllNodesInfo().values()
6629
    tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6630
    results = []
6631
    for path, target in tgts:
6632
      for tag in target.GetTags():
6633
        if self.re.search(tag):
6634
          results.append((path, tag))
6635
    return results
6636

    
6637

    
6638
class LUAddTags(TagsLU):
6639
  """Sets a tag on a given object.
6640

6641
  """
6642
  _OP_REQP = ["kind", "name", "tags"]
6643
  REQ_BGL = False
6644

    
6645
  def CheckPrereq(self):
6646
    """Check prerequisites.
6647

6648
    This checks the type and length of the tag name and value.
6649

6650
    """
6651
    TagsLU.CheckPrereq(self)
6652
    for tag in self.op.tags:
6653
      objects.TaggableObject.ValidateTag(tag)
6654

    
6655
  def Exec(self, feedback_fn):
6656
    """Sets the tag.
6657

6658
    """
6659
    try:
6660
      for tag in self.op.tags:
6661
        self.target.AddTag(tag)
6662
    except errors.TagError, err:
6663
      raise errors.OpExecError("Error while setting tag: %s" % str(err))
6664
    try:
6665
      self.cfg.Update(self.target)
6666
    except errors.ConfigurationError:
6667
      raise errors.OpRetryError("There has been a modification to the"
6668
                                " config file and the operation has been"
6669
                                " aborted. Please retry.")
6670

    
6671

    
6672
class LUDelTags(TagsLU):
6673
  """Delete a list of tags from a given object.
6674

6675
  """
6676
  _OP_REQP = ["kind", "name", "tags"]
6677
  REQ_BGL = False
6678

    
6679
  def CheckPrereq(self):
6680
    """Check prerequisites.
6681

6682
    This checks that we have the given tag.
6683

6684
    """
6685
    TagsLU.CheckPrereq(self)
6686
    for tag in self.op.tags:
6687
      objects.TaggableObject.ValidateTag(tag)
6688
    del_tags = frozenset(self.op.tags)
6689
    cur_tags = self.target.GetTags()
6690
    if not del_tags <= cur_tags:
6691
      diff_tags = del_tags - cur_tags
6692
      diff_names = ["'%s'" % tag for tag in diff_tags]
6693
      diff_names.sort()
6694
      raise errors.OpPrereqError("Tag(s) %s not found" %
6695
                                 (",".join(diff_names)))
6696

    
6697
  def Exec(self, feedback_fn):
6698
    """Remove the tag from the object.
6699

6700
    """
6701
    for tag in self.op.tags:
6702
      self.target.RemoveTag(tag)
6703
    try:
6704
      self.cfg.Update(self.target)
6705
    except errors.ConfigurationError:
6706
      raise errors.OpRetryError("There has been a modification to the"
6707
                                " config file and the operation has been"
6708
                                " aborted. Please retry.")
6709

    
6710

    
6711
class LUTestDelay(NoHooksLU):
6712
  """Sleep for a specified amount of time.
6713

6714
  This LU sleeps on the master and/or nodes for a specified amount of
6715
  time.
6716

6717
  """
6718
  _OP_REQP = ["duration", "on_master", "on_nodes"]
6719
  REQ_BGL = False
6720

    
6721
  def ExpandNames(self):
6722
    """Expand names and set required locks.
6723

6724
    This expands the node list, if any.
6725

6726
    """
6727
    self.needed_locks = {}
6728
    if self.op.on_nodes:
6729
      # _GetWantedNodes can be used here, but is not always appropriate to use
6730
      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6731
      # more information.
6732
      self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6733
      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6734

    
6735
  def CheckPrereq(self):
6736
    """Check prerequisites.
6737

6738
    """
6739

    
6740
  def Exec(self, feedback_fn):
6741
    """Do the actual sleep.
6742

6743
    """
6744
    if self.op.on_master:
6745
      if not utils.TestDelay(self.op.duration):
6746
        raise errors.OpExecError("Error during master delay test")
6747
    if self.op.on_nodes:
6748
      result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6749
      if not result:
6750
        raise errors.OpExecError("Complete failure from rpc call")
6751
      for node, node_result in result.items():
6752
        node_result.Raise()
6753
        if not node_result.data:
6754
          raise errors.OpExecError("Failure during rpc call to node %s,"
6755
                                   " result: %s" % (node, node_result.data))
6756

    
6757

    
6758
class IAllocator(object):
6759
  """IAllocator framework.
6760

6761
  An IAllocator instance has three sets of attributes:
6762
    - cfg that is needed to query the cluster
6763
    - input data (all members of the _KEYS class attribute are required)
6764
    - four buffer attributes (in|out_data|text), that represent the
6765
      input (to the external script) in text and data structure format,
6766
      and the output from it, again in two formats
6767
    - the result variables from the script (success, info, nodes) for
6768
      easy usage
6769

6770
  """
6771
  _ALLO_KEYS = [
6772
    "mem_size", "disks", "disk_template",
6773
    "os", "tags", "nics", "vcpus", "hypervisor",
6774
    ]
6775
  _RELO_KEYS = [
6776
    "relocate_from",
6777
    ]
6778

    
6779
  def __init__(self, lu, mode, name, **kwargs):
6780
    self.lu = lu
6781
    # init buffer variables
6782
    self.in_text = self.out_text = self.in_data = self.out_data = None
6783
    # init all input fields so that pylint is happy
6784
    self.mode = mode
6785
    self.name = name
6786
    self.mem_size = self.disks = self.disk_template = None
6787
    self.os = self.tags = self.nics = self.vcpus = None
6788
    self.hypervisor = None
6789
    self.relocate_from = None
6790
    # computed fields
6791
    self.required_nodes = None
6792
    # init result fields
6793
    self.success = self.info = self.nodes = None
6794
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6795
      keyset = self._ALLO_KEYS
6796
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6797
      keyset = self._RELO_KEYS
6798
    else:
6799
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6800
                                   " IAllocator" % self.mode)
6801
    for key in kwargs:
6802
      if key not in keyset:
6803
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
6804
                                     " IAllocator" % key)
6805
      setattr(self, key, kwargs[key])
6806
    for key in keyset:
6807
      if key not in kwargs:
6808
        raise errors.ProgrammerError("Missing input parameter '%s' to"
6809
                                     " IAllocator" % key)
6810
    self._BuildInputData()
6811

    
6812
  def _ComputeClusterData(self):
6813
    """Compute the generic allocator input data.
6814

6815
    This is the data that is independent of the actual operation.
6816

6817
    """
6818
    cfg = self.lu.cfg
6819
    cluster_info = cfg.GetClusterInfo()
6820
    # cluster data
6821
    data = {
6822
      "version": constants.IALLOCATOR_VERSION,
6823
      "cluster_name": cfg.GetClusterName(),
6824
      "cluster_tags": list(cluster_info.GetTags()),
6825
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6826
      # we don't have job IDs
6827
      }
6828
    iinfo = cfg.GetAllInstancesInfo().values()
6829
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6830

    
6831
    # node data
6832
    node_results = {}
6833
    node_list = cfg.GetNodeList()
6834

    
6835
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6836
      hypervisor_name = self.hypervisor
6837
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6838
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6839

    
6840
    node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6841
                                           hypervisor_name)
6842
    node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6843
                       cluster_info.enabled_hypervisors)
6844
    for nname, nresult in node_data.items():
6845
      # first fill in static (config-based) values
6846
      ninfo = cfg.GetNodeInfo(nname)
6847
      pnr = {
6848
        "tags": list(ninfo.GetTags()),
6849
        "primary_ip": ninfo.primary_ip,
6850
        "secondary_ip": ninfo.secondary_ip,
6851
        "offline": ninfo.offline,
6852
        "drained": ninfo.drained,
6853
        "master_candidate": ninfo.master_candidate,
6854
        }
6855

    
6856
      if not ninfo.offline:
6857
        nresult.Raise()
6858
        if not isinstance(nresult.data, dict):
6859
          raise errors.OpExecError("Can't get data for node %s" % nname)
6860
        remote_info = nresult.data
6861
        for attr in ['memory_total', 'memory_free', 'memory_dom0',
6862
                     'vg_size', 'vg_free', 'cpu_total']:
6863
          if attr not in remote_info:
6864
            raise errors.OpExecError("Node '%s' didn't return attribute"
6865
                                     " '%s'" % (nname, attr))
6866
          try:
6867
            remote_info[attr] = int(remote_info[attr])
6868
          except ValueError, err:
6869
            raise errors.OpExecError("Node '%s' returned invalid value"
6870
                                     " for '%s': %s" % (nname, attr, err))
6871
        # compute memory used by primary instances
6872
        i_p_mem = i_p_up_mem = 0
6873
        for iinfo, beinfo in i_list:
6874
          if iinfo.primary_node == nname:
6875
            i_p_mem += beinfo[constants.BE_MEMORY]
6876
            if iinfo.name not in node_iinfo[nname].data:
6877
              i_used_mem = 0
6878
            else:
6879
              i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6880
            i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6881
            remote_info['memory_free'] -= max(0, i_mem_diff)
6882

    
6883
            if iinfo.admin_up:
6884
              i_p_up_mem += beinfo[constants.BE_MEMORY]
6885

    
6886
        # compute memory used by instances
6887
        pnr_dyn = {
6888
          "total_memory": remote_info['memory_total'],
6889
          "reserved_memory": remote_info['memory_dom0'],
6890
          "free_memory": remote_info['memory_free'],
6891
          "total_disk": remote_info['vg_size'],
6892
          "free_disk": remote_info['vg_free'],
6893
          "total_cpus": remote_info['cpu_total'],
6894
          "i_pri_memory": i_p_mem,
6895
          "i_pri_up_memory": i_p_up_mem,
6896
          }
6897
        pnr.update(pnr_dyn)
6898

    
6899
      node_results[nname] = pnr
6900
    data["nodes"] = node_results
6901

    
6902
    # instance data
6903
    instance_data = {}
6904
    for iinfo, beinfo in i_list:
6905
      nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6906
                  for n in iinfo.nics]
6907
      pir = {
6908
        "tags": list(iinfo.GetTags()),
6909
        "admin_up": iinfo.admin_up,
6910
        "vcpus": beinfo[constants.BE_VCPUS],
6911
        "memory": beinfo[constants.BE_MEMORY],
6912
        "os": iinfo.os,
6913
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6914
        "nics": nic_data,
6915
        "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6916
        "disk_template": iinfo.disk_template,
6917
        "hypervisor": iinfo.hypervisor,
6918
        }
6919
      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6920
                                                 pir["disks"])
6921
      instance_data[iinfo.name] = pir
6922

    
6923
    data["instances"] = instance_data
6924

    
6925
    self.in_data = data
6926

    
6927
  def _AddNewInstance(self):
6928
    """Add new instance data to allocator structure.
6929

6930
    This in combination with _AllocatorGetClusterData will create the
6931
    correct structure needed as input for the allocator.
6932

6933
    The checks for the completeness of the opcode must have already been
6934
    done.
6935

6936
    """
6937
    data = self.in_data
6938

    
6939
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6940

    
6941
    if self.disk_template in constants.DTS_NET_MIRROR:
6942
      self.required_nodes = 2
6943
    else:
6944
      self.required_nodes = 1
6945
    request = {
6946
      "type": "allocate",
6947
      "name": self.name,
6948
      "disk_template": self.disk_template,
6949
      "tags": self.tags,
6950
      "os": self.os,
6951
      "vcpus": self.vcpus,
6952
      "memory": self.mem_size,
6953
      "disks": self.disks,
6954
      "disk_space_total": disk_space,
6955
      "nics": self.nics,
6956
      "required_nodes": self.required_nodes,
6957
      }
6958
    data["request"] = request
6959

    
6960
  def _AddRelocateInstance(self):
6961
    """Add relocate instance data to allocator structure.
6962

6963
    This in combination with _IAllocatorGetClusterData will create the
6964
    correct structure needed as input for the allocator.
6965

6966
    The checks for the completeness of the opcode must have already been
6967
    done.
6968

6969
    """
6970
    instance = self.lu.cfg.GetInstanceInfo(self.name)
6971
    if instance is None:
6972
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
6973
                                   " IAllocator" % self.name)
6974

    
6975
    if instance.disk_template not in constants.DTS_NET_MIRROR:
6976
      raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6977

    
6978
    if len(instance.secondary_nodes) != 1:
6979
      raise errors.OpPrereqError("Instance has not exactly one secondary node")
6980

    
6981
    self.required_nodes = 1
6982
    disk_sizes = [{'size': disk.size} for disk in instance.disks]
6983
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6984

    
6985
    request = {
6986
      "type": "relocate",
6987
      "name": self.name,
6988
      "disk_space_total": disk_space,
6989
      "required_nodes": self.required_nodes,
6990
      "relocate_from": self.relocate_from,
6991
      }
6992
    self.in_data["request"] = request
6993

    
6994
  def _BuildInputData(self):
6995
    """Build input data structures.
6996

6997
    """
6998
    self._ComputeClusterData()
6999

    
7000
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7001
      self._AddNewInstance()
7002
    else:
7003
      self._AddRelocateInstance()
7004

    
7005
    self.in_text = serializer.Dump(self.in_data)
7006

    
7007
  def Run(self, name, validate=True, call_fn=None):
7008
    """Run an instance allocator and return the results.
7009

7010
    """
7011
    if call_fn is None:
7012
      call_fn = self.lu.rpc.call_iallocator_runner
7013
    data = self.in_text
7014

    
7015
    result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7016
    result.Raise()
7017

    
7018
    if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7019
      raise errors.OpExecError("Invalid result from master iallocator runner")
7020

    
7021
    rcode, stdout, stderr, fail = result.data
7022

    
7023
    if rcode == constants.IARUN_NOTFOUND:
7024
      raise errors.OpExecError("Can't find allocator '%s'" % name)
7025
    elif rcode == constants.IARUN_FAILURE:
7026
      raise errors.OpExecError("Instance allocator call failed: %s,"
7027
                               " output: %s" % (fail, stdout+stderr))
7028
    self.out_text = stdout
7029
    if validate:
7030
      self._ValidateResult()
7031

    
7032
  def _ValidateResult(self):
7033
    """Process the allocator results.
7034

7035
    This will process and if successful save the result in
7036
    self.out_data and the other parameters.
7037

7038
    """
7039
    try:
7040
      rdict = serializer.Load(self.out_text)
7041
    except Exception, err:
7042
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7043

    
7044
    if not isinstance(rdict, dict):
7045
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
7046

    
7047
    for key in "success", "info", "nodes":
7048
      if key not in rdict:
7049
        raise errors.OpExecError("Can't parse iallocator results:"
7050
                                 " missing key '%s'" % key)
7051
      setattr(self, key, rdict[key])
7052

    
7053
    if not isinstance(rdict["nodes"], list):
7054
      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7055
                               " is not a list")
7056
    self.out_data = rdict
7057

    
7058

    
7059
class LUTestAllocator(NoHooksLU):
7060
  """Run allocator tests.
7061

7062
  This LU runs the allocator tests
7063

7064
  """
7065
  _OP_REQP = ["direction", "mode", "name"]
7066

    
7067
  def CheckPrereq(self):
7068
    """Check prerequisites.
7069

7070
    This checks the opcode parameters depending on the director and mode test.
7071

7072
    """
7073
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7074
      for attr in ["name", "mem_size", "disks", "disk_template",
7075
                   "os", "tags", "nics", "vcpus"]:
7076
        if not hasattr(self.op, attr):
7077
          raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7078
                                     attr)
7079
      iname = self.cfg.ExpandInstanceName(self.op.name)
7080
      if iname is not None:
7081
        raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7082
                                   iname)
7083
      if not isinstance(self.op.nics, list):
7084
        raise errors.OpPrereqError("Invalid parameter 'nics'")
7085
      for row in self.op.nics:
7086
        if (not isinstance(row, dict) or
7087
            "mac" not in row or
7088
            "ip" not in row or
7089
            "bridge" not in row):
7090
          raise errors.OpPrereqError("Invalid contents of the"
7091
                                     " 'nics' parameter")
7092
      if not isinstance(self.op.disks, list):
7093
        raise errors.OpPrereqError("Invalid parameter 'disks'")
7094
      for row in self.op.disks:
7095
        if (not isinstance(row, dict) or
7096
            "size" not in row or
7097
            not isinstance(row["size"], int) or
7098
            "mode" not in row or
7099
            row["mode"] not in ['r', 'w']):
7100
          raise errors.OpPrereqError("Invalid contents of the"
7101
                                     " 'disks' parameter")
7102
      if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7103
        self.op.hypervisor = self.cfg.GetHypervisorType()
7104
    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7105
      if not hasattr(self.op, "name"):
7106
        raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7107
      fname = self.cfg.ExpandInstanceName(self.op.name)
7108
      if fname is None:
7109
        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7110
                                   self.op.name)
7111
      self.op.name = fname
7112
      self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7113
    else:
7114
      raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7115
                                 self.op.mode)
7116

    
7117
    if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7118
      if not hasattr(self.op, "allocator") or self.op.allocator is None:
7119
        raise errors.OpPrereqError("Missing allocator name")
7120
    elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7121
      raise errors.OpPrereqError("Wrong allocator test '%s'" %
7122
                                 self.op.direction)
7123

    
7124
  def Exec(self, feedback_fn):
7125
    """Run the allocator test.
7126

7127
    """
7128
    if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7129
      ial = IAllocator(self,
7130
                       mode=self.op.mode,
7131
                       name=self.op.name,
7132
                       mem_size=self.op.mem_size,
7133
                       disks=self.op.disks,
7134
                       disk_template=self.op.disk_template,
7135
                       os=self.op.os,
7136
                       tags=self.op.tags,
7137
                       nics=self.op.nics,
7138
                       vcpus=self.op.vcpus,
7139
                       hypervisor=self.op.hypervisor,
7140
                       )
7141
    else:
7142
      ial = IAllocator(self,
7143
                       mode=self.op.mode,
7144
                       name=self.op.name,
7145
                       relocate_from=list(self.relocate_from),
7146
                       )
7147

    
7148
    if self.op.direction == constants.IALLOCATOR_DIR_IN:
7149
      result = ial.in_text
7150
    else:
7151
      ial.Run(self.op.allocator, validate=False)
7152
      result = ial.out_text
7153
    return result